1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import java.io.File; 22 import java.io.IOException; 23 import java.text.ParseException; 24 import java.util.ArrayList; 25 import java.util.Calendar; 26 import java.util.List; 27 import java.util.HashMap; 28 import java.util.Map; 29 import java.util.Iterator; 30 import java.util.regex.Matcher; 31 import java.util.regex.Pattern; 32 33 import junit.framework.TestCase; 34 35 import org.apache.hadoop.conf.Configuration; 36 import org.apache.hadoop.fs.FileStatus; 37 import org.apache.hadoop.fs.FileSystem; 38 import org.apache.hadoop.fs.Path; 39 import org.apache.hadoop.fs.PathFilter; 40 import org.apache.hadoop.fs.permission.FsPermission; 41 import org.apache.hadoop.hdfs.MiniDFSCluster; 42 import org.apache.hadoop.mapred.JobHistory.*; 43 import org.apache.hadoop.mapred.QueueManager.QueueACL; 44 import org.apache.hadoop.mapreduce.JobACL; 45 import org.apache.hadoop.mapreduce.TaskType; 46 import org.apache.commons.logging.Log; 47 import org.apache.commons.logging.LogFactory; 48 import org.apache.hadoop.security.UserGroupInformation; 49 import org.apache.hadoop.security.authorize.AccessControlList; 50 51 /** 52 * Tests the JobHistory files - to catch any changes to JobHistory that can 53 * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer. 54 * 55 * testJobHistoryFile 56 * Run a job that will be succeeded and validate its history file format and 57 * content. 58 * 59 * testJobHistoryUserLogLocation 60 * Run jobs with the given values of hadoop.job.history.user.location as 61 * (1)null(default case), (2)"none", and (3)some user specified dir. 62 * Validate user history file location in each case. 63 * 64 * testJobHistoryJobStatus 65 * Run jobs that will be (1) succeeded (2) failed (3) killed. 66 * Validate job status read from history file in each case. 67 * 68 * Future changes to job history are to be reflected here in this file. 69 */ 70 public class TestJobHistory extends TestCase { 71 private static final Log LOG = LogFactory.getLog(TestJobHistory.class); 72 73 private static String TEST_ROOT_DIR = new File(System.getProperty( 74 "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); 75 76 private static final Pattern digitsPattern = 77 Pattern.compile(JobHistory.DIGITS); 78 79 // hostname like /default-rack/host1.foo.com OR host1.foo.com 80 private static final Pattern hostNamePattern = Pattern.compile( 81 "(/(([\\w\\-\\.]+)/)+)?([\\w\\-\\.]+)"); 82 83 private static final String IP_ADDR = 84 "\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?"; 85 86 // hostname like /default-rack/host1.foo.com OR host1.foo.com 87 private static final Pattern trackerNamePattern = Pattern.compile( 88 "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" + 89 IP_ADDR + ":" + JobHistory.DIGITS); 90 91 private static final Pattern splitsPattern = Pattern.compile( 92 hostNamePattern + "(," + hostNamePattern + ")*"); 93 94 private static Map<String, List<String>> taskIDsToAttemptIDs = 95 new HashMap<String, List<String>>(); 96 97 //Each Task End seen from history file is added here 98 private static List<String> taskEnds = new ArrayList<String>(); 99 100 // List of tasks that appear in history file after JT reatart. This is to 101 // allow START_TIME=0 for these tasks. 102 private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>(); 103 104 // List of potential tasks whose start time can be 0 because of JT restart 105 private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>(); 106 107 /** 108 * Listener for history log file, it populates JobHistory.JobInfo 109 * object with data from log file and validates the data. 110 */ 111 static class TestListener 112 extends DefaultJobHistoryParser.JobTasksParseListener { 113 int lineNum;//line number of history log file 114 boolean isJobLaunched; 115 boolean isJTRestarted; 116 TestListener(JobHistory.JobInfo job)117 TestListener(JobHistory.JobInfo job) { 118 super(job); 119 lineNum = 0; 120 isJobLaunched = false; 121 isJTRestarted = false; 122 } 123 124 // TestListener implementation handle(RecordTypes recType, Map<Keys, String> values)125 public void handle(RecordTypes recType, Map<Keys, String> values) 126 throws IOException { 127 128 lineNum++; 129 130 // Check if the record is of type Meta 131 if (recType == JobHistory.RecordTypes.Meta) { 132 long version = Long.parseLong(values.get(Keys.VERSION)); 133 assertTrue("Unexpected job history version ", 134 (version >= 0 && version <= JobHistory.VERSION)); 135 } 136 else if (recType.equals(RecordTypes.Job)) { 137 String jobid = values.get(Keys.JOBID); 138 assertTrue("record type 'Job' is seen without JOBID key" + 139 " in history file at line " + lineNum, jobid != null); 140 JobID id = JobID.forName(jobid); 141 assertTrue("JobID in history file is in unexpected format " + 142 "at line " + lineNum, id != null); 143 String time = values.get(Keys.LAUNCH_TIME); 144 if (time != null) { 145 if (isJobLaunched) { 146 // We assume that if we see LAUNCH_TIME again, it is because of JT restart 147 isJTRestarted = true; 148 } 149 else {// job launched first time 150 isJobLaunched = true; 151 } 152 } 153 time = values.get(Keys.FINISH_TIME); 154 if (time != null) { 155 assertTrue ("Job FINISH_TIME is seen in history file at line " + 156 lineNum + " before LAUNCH_TIME is seen", isJobLaunched); 157 } 158 } 159 else if (recType.equals(RecordTypes.Task)) { 160 String taskid = values.get(Keys.TASKID); 161 assertTrue("record type 'Task' is seen without TASKID key" + 162 " in history file at line " + lineNum, taskid != null); 163 TaskID id = TaskID.forName(taskid); 164 assertTrue("TaskID in history file is in unexpected format " + 165 "at line " + lineNum, id != null); 166 167 String time = values.get(Keys.START_TIME); 168 if (time != null) { 169 List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); 170 assertTrue("Duplicate START_TIME seen for task " + taskid + 171 " in history file at line " + lineNum, attemptIDs == null); 172 attemptIDs = new ArrayList<String>(); 173 taskIDsToAttemptIDs.put(taskid, attemptIDs); 174 175 if (isJTRestarted) { 176 // This maintains a potential ignoreStartTimeTasks list 177 tempIgnoreStartTimeOfTasks.add(taskid); 178 } 179 } 180 181 time = values.get(Keys.FINISH_TIME); 182 if (time != null) { 183 String s = values.get(Keys.TASK_STATUS); 184 if (s != null) { 185 List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); 186 assertTrue ("Task FINISH_TIME is seen in history file at line " + 187 lineNum + " before START_TIME is seen", attemptIDs != null); 188 189 // Check if all the attemptIDs of this task are finished 190 assertTrue("TaskId " + taskid + " is finished at line " + 191 lineNum + " but its attemptID is not finished.", 192 (attemptIDs.size() <= 1)); 193 194 // Check if at least 1 attempt of this task is seen 195 assertTrue("TaskId " + taskid + " is finished at line " + 196 lineNum + " but no attemptID is seen before this.", 197 attemptIDs.size() == 1); 198 199 if (s.equals("KILLED") || s.equals("FAILED")) { 200 // Task End with KILLED/FAILED status in history file is 201 // considered as TaskEnd, TaskStart. This is useful in checking 202 // the order of history lines. 203 attemptIDs = new ArrayList<String>(); 204 taskIDsToAttemptIDs.put(taskid, attemptIDs); 205 } 206 else { 207 taskEnds.add(taskid); 208 } 209 } 210 else { 211 // This line of history file could be just an update to finish time 212 } 213 } 214 } 215 else if (recType.equals(RecordTypes.MapAttempt) || 216 recType.equals(RecordTypes.ReduceAttempt)) { 217 String taskid = values.get(Keys.TASKID); 218 assertTrue("record type " + recType + " is seen without TASKID key" + 219 " in history file at line " + lineNum, taskid != null); 220 221 String attemptId = values.get(Keys.TASK_ATTEMPT_ID); 222 TaskAttemptID id = TaskAttemptID.forName(attemptId); 223 assertTrue("AttemptID in history file is in unexpected format " + 224 "at line " + lineNum, id != null); 225 226 String time = values.get(Keys.START_TIME); 227 if (time != null) { 228 List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); 229 assertTrue ("TaskAttempt is seen in history file at line " + lineNum + 230 " before Task is seen", attemptIDs != null); 231 assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " + 232 "file at line " + lineNum, attemptIDs.remove(attemptId)); 233 234 if (attemptIDs.isEmpty()) { 235 //just a boolean whether any attempt is seen or not 236 attemptIDs.add("firstAttemptIsSeen"); 237 } 238 attemptIDs.add(attemptId); 239 240 if (tempIgnoreStartTimeOfTasks.contains(taskid) && 241 (id.getId() < 1000)) { 242 // If Task line of this attempt is seen in history file after 243 // JT restart and if this attempt is < 1000(i.e. attempt is noti 244 // started after JT restart) - assuming single JT restart happened 245 ignoreStartTimeOfTasks.add(taskid); 246 } 247 } 248 249 time = values.get(Keys.FINISH_TIME); 250 if (time != null) { 251 List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); 252 assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line " 253 + lineNum + " before Task is seen", attemptIDs != null); 254 255 assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line " 256 + lineNum + " before TaskAttempt START_TIME is seen", 257 attemptIDs.remove(attemptId)); 258 } 259 } 260 super.handle(recType, values); 261 } 262 } 263 264 // Check if the time is in the expected format isTimeValid(String time)265 private static boolean isTimeValid(String time) { 266 Matcher m = digitsPattern.matcher(time); 267 return m.matches() && (Long.parseLong(time) > 0); 268 } 269 areTimesInOrder(String time1, String time2)270 private static boolean areTimesInOrder(String time1, String time2) { 271 return (Long.parseLong(time1) <= Long.parseLong(time2)); 272 } 273 274 // Validate Format of Job Level Keys, Values read from history file validateJobLevelKeyValuesFormat(Map<Keys, String> values, String status)275 private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values, 276 String status) { 277 String time = values.get(Keys.SUBMIT_TIME); 278 assertTrue("Job SUBMIT_TIME is in unexpected format:" + time + 279 " in history file", isTimeValid(time)); 280 281 time = values.get(Keys.LAUNCH_TIME); 282 assertTrue("Job LAUNCH_TIME is in unexpected format:" + time + 283 " in history file", isTimeValid(time)); 284 285 String time1 = values.get(Keys.FINISH_TIME); 286 assertTrue("Job FINISH_TIME is in unexpected format:" + time1 + 287 " in history file", isTimeValid(time1)); 288 assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file", 289 areTimesInOrder(time, time1)); 290 291 String stat = values.get(Keys.JOB_STATUS); 292 assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" + 293 " history file", (status.equals(stat))); 294 295 String priority = values.get(Keys.JOB_PRIORITY); 296 assertTrue("Unknown priority for the job in history file", 297 (priority.equals("HIGH") || 298 priority.equals("LOW") || priority.equals("NORMAL") || 299 priority.equals("VERY_HIGH") || priority.equals("VERY_LOW"))); 300 } 301 302 // Validate Format of Task Level Keys, Values read from history file validateTaskLevelKeyValuesFormat(JobHistory.JobInfo job, boolean splitsCanBeEmpty)303 private static void validateTaskLevelKeyValuesFormat(JobHistory.JobInfo job, 304 boolean splitsCanBeEmpty) { 305 Map<String, JobHistory.Task> tasks = job.getAllTasks(); 306 307 // validate info of each task 308 for (JobHistory.Task task : tasks.values()) { 309 310 String tid = task.get(Keys.TASKID); 311 String time = task.get(Keys.START_TIME); 312 // We allow START_TIME=0 for tasks seen in history after JT restart 313 if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) { 314 assertTrue("Task START_TIME of " + tid + " is in unexpected format:" + 315 time + " in history file", isTimeValid(time)); 316 } 317 318 String time1 = task.get(Keys.FINISH_TIME); 319 assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" + 320 time1 + " in history file", isTimeValid(time1)); 321 assertTrue("Task FINISH_TIME is < START_TIME in history file", 322 areTimesInOrder(time, time1)); 323 324 // Make sure that the Task type exists and it is valid 325 String type = task.get(Keys.TASK_TYPE); 326 assertTrue("Unknown Task type \"" + type + "\" is seen in " + 327 "history file for task " + tid, 328 (type.equals("MAP") || type.equals("REDUCE") || 329 type.equals("SETUP") || type.equals("CLEANUP"))); 330 331 if (type.equals("MAP")) { 332 String splits = task.get(Keys.SPLITS); 333 //order in the condition OR check is important here 334 if (!splitsCanBeEmpty || splits.length() != 0) { 335 Matcher m = splitsPattern.matcher(splits); 336 assertTrue("Unexpected format of SPLITS \"" + splits + "\" is seen" + 337 " in history file for task " + tid, m.matches()); 338 } 339 } 340 341 // Validate task status 342 String status = task.get(Keys.TASK_STATUS); 343 assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" + 344 " history file for task " + tid, (status.equals("SUCCESS") || 345 status.equals("FAILED") || status.equals("KILLED"))); 346 } 347 } 348 349 // Validate foramt of Task Attempt Level Keys, Values read from history file validateTaskAttemptLevelKeyValuesFormat(JobHistory.JobInfo job)350 private static void validateTaskAttemptLevelKeyValuesFormat(JobHistory.JobInfo job) { 351 Map<String, JobHistory.Task> tasks = job.getAllTasks(); 352 353 // For each task 354 for (JobHistory.Task task : tasks.values()) { 355 // validate info of each attempt 356 for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) { 357 358 String id = attempt.get(Keys.TASK_ATTEMPT_ID); 359 String time = attempt.get(Keys.START_TIME); 360 assertTrue("START_TIME of task attempt " + id + 361 " is in unexpected format:" + time + 362 " in history file", isTimeValid(time)); 363 364 String time1 = attempt.get(Keys.FINISH_TIME); 365 assertTrue("FINISH_TIME of task attempt " + id + 366 " is in unexpected format:" + time1 + 367 " in history file", isTimeValid(time1)); 368 assertTrue("Task FINISH_TIME is < START_TIME in history file", 369 areTimesInOrder(time, time1)); 370 371 // Make sure that the Task type exists and it is valid 372 String type = attempt.get(Keys.TASK_TYPE); 373 assertTrue("Unknown Task type \"" + type + "\" is seen in " + 374 "history file for task attempt " + id, 375 (type.equals("MAP") || type.equals("REDUCE") || 376 type.equals("SETUP") || type.equals("CLEANUP"))); 377 378 // Validate task status 379 String status = attempt.get(Keys.TASK_STATUS); 380 assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" + 381 " history file for task attempt " + id, 382 (status.equals("SUCCESS") || status.equals("FAILED") || 383 status.equals("KILLED"))); 384 385 // Validate task Avataar 386 String avataar = attempt.get(Keys.AVATAAR); 387 assertTrue("Unexpected LOCALITY \"" + avataar + "\" is seen in " + 388 " history file for task attempt " + id, 389 (avataar.equals("VIRGIN") || avataar.equals("SPECULATIVE")) 390 ); 391 392 // Map Task Attempts should have valid LOCALITY 393 if (type.equals("MAP")) { 394 String locality = attempt.get(Keys.LOCALITY); 395 assertTrue("Unexpected LOCALITY \"" + locality + "\" is seen in " + 396 " history file for task attempt " + id, 397 (locality.equals("NODE_LOCAL") || locality.equals("GROUP_LOCAL") || 398 locality.equals("RACK_LOCAL") || locality.equals("OFF_SWITCH")) 399 ); 400 } 401 402 // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and 403 // SORT_FINISHED time 404 if (type.equals("REDUCE") && status.equals("SUCCESS")) { 405 time1 = attempt.get(Keys.SHUFFLE_FINISHED); 406 assertTrue("SHUFFLE_FINISHED time of task attempt " + id + 407 " is in unexpected format:" + time1 + 408 " in history file", isTimeValid(time1)); 409 assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " + 410 "in history file", areTimesInOrder(time, time1)); 411 time = attempt.get(Keys.SORT_FINISHED); 412 assertTrue("SORT_FINISHED of task attempt " + id + 413 " is in unexpected format:" + time + 414 " in history file", isTimeValid(time)); 415 assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" + 416 " in history file", areTimesInOrder(time1, time)); 417 } 418 419 // check if hostname is valid 420 String hostname = attempt.get(Keys.HOSTNAME); 421 Matcher m = hostNamePattern.matcher(hostname); 422 assertTrue("Unexpected Host name of task attempt " + id, m.matches()); 423 424 // check if trackername is valid 425 String trackerName = attempt.get(Keys.TRACKER_NAME); 426 m = trackerNamePattern.matcher(trackerName); 427 assertTrue("Unexpected tracker name of task attempt " + id, 428 m.matches()); 429 430 if (!status.equals("KILLED")) { 431 // check if http port is valid 432 String httpPort = attempt.get(Keys.HTTP_PORT); 433 m = digitsPattern.matcher(httpPort); 434 assertTrue("Unexpected http port of task attempt " + id, m.matches()); 435 } 436 437 // check if counters are parsable 438 String counters = attempt.get(Keys.COUNTERS); 439 try { 440 Counters readCounters = Counters.fromEscapedCompactString(counters); 441 assertTrue("Counters of task attempt " + id + " are not parsable", 442 readCounters != null); 443 } catch (ParseException pe) { 444 LOG.warn("While trying to parse counters of task attempt " + id + 445 ", " + pe); 446 } 447 } 448 } 449 } 450 451 /** 452 * Returns the conf file name in the same 453 * @param path path of the jobhistory file 454 * @param running whether the job is running or completed 455 */ getPathForConf(Path path)456 private static Path getPathForConf(Path path) { 457 return JobHistory.confPathFromLogFilePath(path); 458 } 459 460 /** 461 * Validates the format of contents of history file 462 * (1) history file exists and in correct location 463 * (2) Verify if the history file is parsable 464 * (3) Validate the contents of history file 465 * (a) Format of all TIMEs are checked against a regex 466 * (b) validate legality/format of job level key, values 467 * (c) validate legality/format of task level key, values 468 * (d) validate legality/format of attempt level key, values 469 * (e) check if all the TaskAttempts, Tasks started are finished. 470 * Check finish of each TaskAttemptID against its start to make sure 471 * that all TaskAttempts, Tasks started are indeed finished and the 472 * history log lines are in the proper order. 473 * We want to catch ordering of history lines like 474 * Task START 475 * Attempt START 476 * Task FINISH 477 * Attempt FINISH 478 * (speculative execution is turned off for this). 479 * @param id job id 480 * @param conf job conf 481 */ validateJobHistoryFileFormat(JobID id, JobConf conf, String status, boolean splitsCanBeEmpty)482 static void validateJobHistoryFileFormat(JobID id, JobConf conf, 483 String status, boolean splitsCanBeEmpty) throws IOException { 484 485 // Get the history file name 486 Path dir = JobHistory.getCompletedJobHistoryLocation(); 487 String logFileName = getDoneFile(conf, id, dir); 488 489 // Framework history log file location 490 Path logFile = new Path(dir, logFileName); 491 FileSystem fileSys = logFile.getFileSystem(conf); 492 493 // Check if the history file exists 494 assertTrue("History file does not exist", fileSys.exists(logFile)); 495 496 // Check that the log file name includes a directory level for the version number 497 assertTrue("History filename does not include a directory level " 498 + "for the version number.", 499 logFile.toString() 500 .contains("/" 501 + JobHistory.DONE_DIRECTORY_FORMAT_DIRNAME 502 + "/")); 503 504 // check if the history file is parsable 505 String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName( 506 logFileName).split("_"); 507 508 String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; 509 JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId); 510 511 TestListener l = new TestListener(jobInfo); 512 JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys); 513 514 515 // validate format of job level key, values 516 validateJobLevelKeyValuesFormat(jobInfo.getValues(), status); 517 518 // validate format of task level key, values 519 validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty); 520 521 // validate format of attempt level key, values 522 validateTaskAttemptLevelKeyValuesFormat(jobInfo); 523 524 // check if all the TaskAttempts, Tasks started are finished for 525 // successful jobs 526 if (status.equals("SUCCESS")) { 527 // Make sure that the lists in taskIDsToAttemptIDs are empty. 528 for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) { 529 String taskid = it.next(); 530 assertTrue("There are some Tasks which are not finished in history " + 531 "file.", taskEnds.contains(taskid)); 532 List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid); 533 if(attemptIDs != null) { 534 assertTrue("Unexpected. TaskID " + taskid + " has task attempt(s)" + 535 " that are not finished.", (attemptIDs.size() == 1)); 536 } 537 } 538 } 539 } 540 541 // Validate Job Level Keys, Values read from history file by 542 // comparing them with the actual values from JT. validateJobLevelKeyValues(MiniMRCluster mr, RunningJob job, JobHistory.JobInfo jobInfo, JobConf conf)543 private static void validateJobLevelKeyValues(MiniMRCluster mr, 544 RunningJob job, JobHistory.JobInfo jobInfo, JobConf conf) throws IOException { 545 546 JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); 547 JobInProgress jip = jt.getJob(job.getID()); 548 549 Map<Keys, String> values = jobInfo.getValues(); 550 551 assertTrue("SUBMIT_TIME of job obtained from history file did not " + 552 "match the expected value", jip.getStartTime() == 553 Long.parseLong(values.get(Keys.SUBMIT_TIME))); 554 555 assertTrue("LAUNCH_TIME of job obtained from history file did not " + 556 "match the expected value", jip.getLaunchTime() == 557 Long.parseLong(values.get(Keys.LAUNCH_TIME))); 558 559 assertTrue("FINISH_TIME of job obtained from history file did not " + 560 "match the expected value", jip.getFinishTime() == 561 Long.parseLong(values.get(Keys.FINISH_TIME))); 562 563 assertTrue("Job Status of job obtained from history file did not " + 564 "match the expected value", 565 values.get(Keys.JOB_STATUS).equals("SUCCESS")); 566 567 assertTrue("Job Priority of job obtained from history file did not " + 568 "match the expected value", jip.getPriority().toString().equals( 569 values.get(Keys.JOB_PRIORITY))); 570 571 assertTrue("Job Name of job obtained from history file did not " + 572 "match the expected value", JobHistory.JobInfo.getJobName(conf).equals( 573 values.get(Keys.JOBNAME))); 574 575 assertTrue("User Name of job obtained from history file did not " + 576 "match the expected value", JobHistory.JobInfo.getUserName(conf).equals( 577 values.get(Keys.USER))); 578 579 // Validate job counters 580 Counters c = new Counters(); 581 jip.getCounters(c); 582 assertTrue("Counters of job obtained from history file did not " + 583 "match the expected value", 584 c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS))); 585 Counters m = new Counters(); 586 jip.getMapCounters(m); 587 assertTrue("Map Counters of job obtained from history file did not " + 588 "match the expected value", m.makeEscapedCompactString(). 589 equals(values.get(Keys.MAP_COUNTERS))); 590 Counters r = new Counters(); 591 jip.getReduceCounters(r); 592 assertTrue("Reduce Counters of job obtained from history file did not " + 593 "match the expected value", r.makeEscapedCompactString(). 594 equals(values.get(Keys.REDUCE_COUNTERS))); 595 596 // Validate number of total maps, total reduces, finished maps, 597 // finished reduces, failed maps, failed recudes 598 String totalMaps = values.get(Keys.TOTAL_MAPS); 599 assertTrue("Unexpected number of total maps in history file", 600 Integer.parseInt(totalMaps) == jip.desiredMaps()); 601 602 String totalReduces = values.get(Keys.TOTAL_REDUCES); 603 assertTrue("Unexpected number of total reduces in history file", 604 Integer.parseInt(totalReduces) == jip.desiredReduces()); 605 606 String finMaps = values.get(Keys.FINISHED_MAPS); 607 assertTrue("Unexpected number of finished maps in history file", 608 Integer.parseInt(finMaps) == jip.finishedMaps()); 609 610 String finReduces = values.get(Keys.FINISHED_REDUCES); 611 assertTrue("Unexpected number of finished reduces in history file", 612 Integer.parseInt(finReduces) == jip.finishedReduces()); 613 614 String failedMaps = values.get(Keys.FAILED_MAPS); 615 assertTrue("Unexpected number of failed maps in history file", 616 Integer.parseInt(failedMaps) == jip.failedMapTasks); 617 618 String failedReduces = values.get(Keys.FAILED_REDUCES); 619 assertTrue("Unexpected number of failed reduces in history file", 620 Integer.parseInt(failedReduces) == jip.failedReduceTasks); 621 } 622 623 // Validate Task Level Keys, Values read from history file by 624 // comparing them with the actual values from JT. validateTaskLevelKeyValues(MiniMRCluster mr, RunningJob job, JobHistory.JobInfo jobInfo)625 private static void validateTaskLevelKeyValues(MiniMRCluster mr, 626 RunningJob job, JobHistory.JobInfo jobInfo) throws IOException { 627 628 JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); 629 JobInProgress jip = jt.getJob(job.getID()); 630 631 // Get the 1st map, 1st reduce, cleanup & setup taskIDs and 632 // validate their history info 633 TaskID mapTaskId = new TaskID(job.getID(), true, 0); 634 TaskID reduceTaskId = new TaskID(job.getID(), false, 0); 635 636 TaskInProgress cleanups[] = jip.getTasks(TaskType.JOB_CLEANUP); 637 TaskID cleanupTaskId; 638 if (cleanups[0].isComplete()) { 639 cleanupTaskId = cleanups[0].getTIPId(); 640 } 641 else { 642 cleanupTaskId = cleanups[1].getTIPId(); 643 } 644 645 TaskInProgress setups[] = jip.getTasks(TaskType.JOB_SETUP); 646 TaskID setupTaskId; 647 if (setups[0].isComplete()) { 648 setupTaskId = setups[0].getTIPId(); 649 } 650 else { 651 setupTaskId = setups[1].getTIPId(); 652 } 653 654 Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks(); 655 656 // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce) 657 for (JobHistory.Task task : tasks.values()) { 658 659 String tid = task.get(Keys.TASKID); 660 if (tid.equals(mapTaskId.toString()) || 661 tid.equals(reduceTaskId.toString()) || 662 tid.equals(cleanupTaskId.toString()) || 663 tid.equals(setupTaskId.toString())) { 664 665 TaskID taskId = null; 666 if (tid.equals(mapTaskId.toString())) { 667 taskId = mapTaskId; 668 } 669 else if (tid.equals(reduceTaskId.toString())) { 670 taskId = reduceTaskId; 671 } 672 else if (tid.equals(cleanupTaskId.toString())) { 673 taskId = cleanupTaskId; 674 } 675 else if (tid.equals(setupTaskId.toString())) { 676 taskId = setupTaskId; 677 } 678 TaskInProgress tip = jip.getTaskInProgress(taskId); 679 assertTrue("START_TIME of Task " + tid + " obtained from history " + 680 "file did not match the expected value", tip.getExecStartTime() == 681 Long.parseLong(task.get(Keys.START_TIME))); 682 683 assertTrue("FINISH_TIME of Task " + tid + " obtained from history " + 684 "file did not match the expected value", tip.getExecFinishTime() == 685 Long.parseLong(task.get(Keys.FINISH_TIME))); 686 687 if (taskId == mapTaskId) {//check splits only for map task 688 assertTrue("Splits of Task " + tid + " obtained from history file " + 689 " did not match the expected value", 690 tip.getSplitNodes().equals(task.get(Keys.SPLITS))); 691 } 692 693 TaskAttemptID attemptId = tip.getSuccessfulTaskid(); 694 TaskStatus ts = tip.getTaskStatus(attemptId); 695 696 // Validate task counters 697 Counters c = ts.getCounters(); 698 assertTrue("Counters of Task " + tid + " obtained from history file " + 699 " did not match the expected value", 700 c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS))); 701 } 702 } 703 } 704 705 // Validate Task Attempt Level Keys, Values read from history file by 706 // comparing them with the actual values from JT. validateTaskAttemptLevelKeyValues(MiniMRCluster mr, RunningJob job, JobHistory.JobInfo jobInfo)707 private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr, 708 RunningJob job, JobHistory.JobInfo jobInfo) throws IOException { 709 710 JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); 711 JobInProgress jip = jt.getJob(job.getID()); 712 713 Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks(); 714 715 // For each task 716 for (JobHistory.Task task : tasks.values()) { 717 // validate info of each attempt 718 for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) { 719 720 String idStr = attempt.get(Keys.TASK_ATTEMPT_ID); 721 TaskAttemptID attemptId = TaskAttemptID.forName(idStr); 722 TaskID tid = attemptId.getTaskID(); 723 724 // Validate task id 725 assertTrue("Task id of Task Attempt " + idStr + " obtained from " + 726 "history file did not match the expected value", 727 tid.toString().equals(attempt.get(Keys.TASKID))); 728 729 TaskInProgress tip = jip.getTaskInProgress(tid); 730 TaskStatus ts = tip.getTaskStatus(attemptId); 731 732 // Validate task attempt start time 733 assertTrue("START_TIME of Task attempt " + idStr + " obtained from " + 734 "history file did not match the expected value", 735 ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME))); 736 737 // Validate task attempt finish time 738 assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " + 739 "history file did not match the expected value", 740 ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME))); 741 742 743 TaskTrackerStatus ttStatus = 744 jt.getTaskTrackerStatus(ts.getTaskTracker()); 745 746 if (ttStatus != null) { 747 assertTrue("http port of task attempt " + idStr + " obtained from " + 748 "history file did not match the expected value", 749 ttStatus.getHttpPort() == 750 Integer.parseInt(attempt.get(Keys.HTTP_PORT))); 751 752 if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) { 753 String ttHostname = jt.getNode(ttStatus.getHost()).toString(); 754 755 // check if hostname is valid 756 assertTrue("Host name of task attempt " + idStr + " obtained from" + 757 " history file did not match the expected value", 758 ttHostname.equals(attempt.get(Keys.HOSTNAME))); 759 } 760 } 761 if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) { 762 // Validate SHUFFLE_FINISHED time and SORT_FINISHED time of 763 // Reduce Task Attempts 764 if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) { 765 assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr + 766 " obtained from history file did not match the expected" + 767 " value", ts.getShuffleFinishTime() == 768 Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED))); 769 assertTrue("SORT_FINISHED time of task attempt " + idStr + 770 " obtained from history file did not match the expected" + 771 " value", ts.getSortFinishTime() == 772 Long.parseLong(attempt.get(Keys.SORT_FINISHED))); 773 } 774 775 //Validate task counters 776 Counters c = ts.getCounters(); 777 assertTrue("Counters of Task Attempt " + idStr + " obtained from " + 778 "history file did not match the expected value", 779 c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS))); 780 } 781 782 // check if tracker name is valid 783 assertTrue("Tracker name of task attempt " + idStr + " obtained from " + 784 "history file did not match the expected value", 785 ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME))); 786 } 787 } 788 } 789 790 /** 791 * Checks if the history file content is as expected comparing with the 792 * actual values obtained from JT. 793 * Job Level, Task Level and Task Attempt Level Keys, Values are validated. 794 * @param job RunningJob object of the job whose history is to be validated 795 * @param conf job conf 796 */ validateJobHistoryFileContent(MiniMRCluster mr, RunningJob job, JobConf conf)797 static void validateJobHistoryFileContent(MiniMRCluster mr, 798 RunningJob job, JobConf conf) throws IOException { 799 800 JobID id = job.getID(); 801 Path doneDir = JobHistory.getCompletedJobHistoryLocation(); 802 // Get the history file name 803 String logFileName = getDoneFile(conf, id, doneDir); 804 805 // Framework history log file location 806 Path logFile = new Path(doneDir, logFileName); 807 FileSystem fileSys = logFile.getFileSystem(conf); 808 809 // Check if the history file exists 810 assertTrue("History file does not exist", fileSys.exists(logFile)); 811 812 813 // check if the history file is parsable 814 String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName( 815 logFileName).split("_"); 816 817 String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; 818 JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId); 819 820 DefaultJobHistoryParser.JobTasksParseListener l = 821 new DefaultJobHistoryParser.JobTasksParseListener(jobInfo); 822 JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys); 823 824 // Now the history file contents are available in jobInfo. Let us compare 825 // them with the actual values from JT. 826 validateJobLevelKeyValues(mr, job, jobInfo, conf); 827 validateTaskLevelKeyValues(mr, job, jobInfo); 828 validateTaskAttemptLevelKeyValues(mr, job, jobInfo); 829 830 // Also JobACLs should be correct 831 if (mr.getJobTrackerRunner().getJobTracker().areACLsEnabled()) { 832 AccessControlList acl = new AccessControlList( 833 conf.get(JobACL.VIEW_JOB.getAclName(), " ")); 834 assertTrue(acl.toString().equals( 835 jobInfo.getJobACLs().get(JobACL.VIEW_JOB).toString())); 836 acl = new AccessControlList( 837 conf.get(JobACL.MODIFY_JOB.getAclName(), " ")); 838 assertTrue(acl.toString().equals( 839 jobInfo.getJobACLs().get(JobACL.MODIFY_JOB).toString())); 840 } 841 842 // Validate the job queue name 843 assertTrue(jobInfo.getJobQueue().equals(conf.getQueueName())); 844 845 // Validate the workflow properties 846 assertTrue(jobInfo.get(Keys.WORKFLOW_ID).equals( 847 conf.get(JobConf.WORKFLOW_ID, ""))); 848 assertTrue(jobInfo.get(Keys.WORKFLOW_NAME).equals( 849 conf.get(JobConf.WORKFLOW_NAME, ""))); 850 assertTrue(jobInfo.get(Keys.WORKFLOW_NODE_NAME).equals( 851 conf.get(JobConf.WORKFLOW_NODE_NAME, ""))); 852 assertTrue(jobInfo.get(Keys.WORKFLOW_ADJACENCIES).equals( 853 JobHistory.JobInfo.getWorkflowAdjacencies(conf))); 854 assertTrue(jobInfo.get(Keys.WORKFLOW_TAGS).equals( 855 conf.get(JobConf.WORKFLOW_TAGS, ""))); 856 } 857 testDoneFolderOnHDFS()858 public void testDoneFolderOnHDFS() throws IOException { 859 MiniMRCluster mr = null; 860 try { 861 JobConf conf = new JobConf(); 862 // keep for less time 863 conf.setLong("mapred.jobtracker.retirejob.check", 1000); 864 conf.setLong("mapred.jobtracker.retirejob.interval", 100000); 865 866 //set the done folder location 867 String doneFolder = "history_done"; 868 conf.set("mapred.job.tracker.history.completed.location", doneFolder); 869 870 MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null); 871 mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(), 872 3, null, null, conf); 873 874 // run the TCs 875 conf = mr.createJobConf(); 876 877 FileSystem fs = FileSystem.get(conf); 878 // clean up 879 fs.delete(new Path("succeed"), true); 880 881 Path inDir = new Path("succeed/input"); 882 Path outDir = new Path("succeed/output"); 883 884 //Disable speculative execution 885 conf.setSpeculativeExecution(false); 886 887 // Make sure that the job is not removed from memory until we do finish 888 // the validation of history file content 889 conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10); 890 conf.set("user.name", UserGroupInformation.getCurrentUser().getUserName()); 891 // Run a job that will be succeeded and validate its history file 892 RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); 893 894 Path doneDir = JobHistory.getCompletedJobHistoryLocation(); 895 assertEquals("History DONE folder not correct", 896 doneFolder, doneDir.getName()); 897 JobID id = job.getID(); 898 String logFileName = getDoneFile(conf, id, doneDir); 899 assertNotNull(logFileName); 900 System.err.println("testDoneFolderOnHDFS -- seeking " + logFileName); 901 // Framework history log file location 902 Path logFile = new Path(doneDir, logFileName); 903 FileSystem fileSys = logFile.getFileSystem(conf); 904 905 // Check if the history file exists 906 assertTrue("History file does not exist", fileSys.exists(logFile)); 907 908 // check if the corresponding conf file exists 909 Path confFile = getPathForConf(logFile); 910 assertTrue("Config for completed jobs doesnt exist: " + confFile, 911 fileSys.exists(confFile)); 912 913 // check if the file exists under a done folder 914 assertTrue("Completed job config doesnt exist under the done folder", 915 confFile.toString().startsWith(doneDir.toString())); 916 917 // check if the file exists in a done folder 918 assertTrue("Completed jobs doesnt exist under the done folder", 919 logFile.toString().startsWith(doneDir.toString())); 920 921 assertTrue("Completed job and config file aren't in the same directory", 922 confFile.getParent().toString().equals(logFile.getParent().toString())); 923 924 // Test that all of the ancestors of the log file have the same 925 // permissions as the done directory 926 927 Path cursor = logFile.getParent(); 928 929 Path doneParent = doneDir.getParent(); 930 931 FsPermission donePermission = getStatus(fileSys, doneDir).getPermission(); 932 933 System.err.println("testDoneFolderOnHDFS: done dir permission = " 934 + donePermission); 935 936 while (!cursor.equals(doneParent)) { 937 FileStatus cursorStatus = getStatus(fileSys, cursor); 938 FsPermission cursorPermission = cursorStatus.getPermission(); 939 940 assertEquals("testDoneFolderOnHDFS: A done directory descendant, " 941 + cursor 942 + " does not have the same permisison as the done directory, " 943 + doneDir, 944 donePermission, 945 cursorPermission); 946 947 cursor = cursor.getParent(); 948 } 949 950 // check if the job file is removed from the history location 951 Path runningJobsHistoryFolder = logFile.getParent().getParent(); 952 Path runningJobHistoryFilename = 953 new Path(runningJobsHistoryFolder, logFile.getName()); 954 Path runningJobConfFilename = 955 new Path(runningJobsHistoryFolder, confFile.getName()); 956 assertFalse("History file not deleted from the running folder", 957 fileSys.exists(runningJobHistoryFilename)); 958 assertFalse("Config for completed jobs not deleted from running folder", 959 fileSys.exists(runningJobConfFilename)); 960 961 validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false); 962 validateJobHistoryFileContent(mr, job, conf); 963 964 // get the job conf filename 965 } finally { 966 if (mr != null) { 967 cleanupLocalFiles(mr); 968 mr.shutdown(); 969 } 970 } 971 } 972 getStatus(FileSystem fs, final Path path)973 private static FileStatus getStatus(FileSystem fs, final Path path) { 974 Path pathParent = path.getParent(); 975 976 try { 977 FileStatus[] statuses 978 = fs.listStatus(pathParent, 979 new PathFilter() { 980 @Override 981 public boolean accept(Path filterPath) { 982 return filterPath.getName().equals(path.getName()); 983 } 984 } 985 ); 986 987 return statuses[0]; 988 } catch (IOException e) { 989 return null; 990 } 991 } 992 993 /** Run a job that will be succeeded and validate its history file format 994 * and its content. 995 */ testJobHistoryFile()996 public void testJobHistoryFile() throws IOException { 997 MiniMRCluster mr = null; 998 try { 999 JobConf conf = new JobConf(); 1000 // keep for less time 1001 conf.setLong("mapred.jobtracker.retirejob.check", 1000); 1002 conf.setLong("mapred.jobtracker.retirejob.interval", 100000); 1003 1004 //set the done folder location 1005 String doneFolder = TEST_ROOT_DIR + "history_done"; 1006 conf.set("mapred.job.tracker.history.completed.location", doneFolder); 1007 1008 // Enable ACLs so that they are logged to history 1009 conf.setBoolean(JobConf.MR_ACLS_ENABLED, true); 1010 // no queue admins for default queue 1011 conf.set(QueueManager.toFullPropertyName( 1012 "default", QueueACL.ADMINISTER_JOBS.getAclName()), " "); 1013 1014 // set workflow properties 1015 conf.set(JobConf.WORKFLOW_ID, "workflowId1"); 1016 conf.set(JobConf.WORKFLOW_NAME, "workflowName1"); 1017 String workflowNodeName = "A"; 1018 conf.set(JobConf.WORKFLOW_NODE_NAME, workflowNodeName); 1019 conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + workflowNodeName, 1020 "BC"); 1021 conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + workflowNodeName, 1022 "DEF"); 1023 conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + "DEF", "G"); 1024 conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + "Z", 1025 workflowNodeName); 1026 conf.set(JobConf.WORKFLOW_TAGS, "tag1,tag2"); 1027 1028 mr = new MiniMRCluster(2, "file:///", 3, null, null, conf); 1029 1030 // run the TCs 1031 conf = mr.createJobConf(); 1032 1033 FileSystem fs = FileSystem.get(conf); 1034 // clean up 1035 fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true); 1036 1037 Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input"); 1038 Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output"); 1039 1040 //Disable speculative execution 1041 conf.setSpeculativeExecution(false); 1042 conf.set(JobACL.VIEW_JOB.getAclName(), "user1,user2 group1,group2"); 1043 conf.set(JobACL.MODIFY_JOB.getAclName(), "user3,user4 group3,group4"); 1044 1045 // Make sure that the job is not removed from memory until we do finish 1046 // the validation of history file content 1047 conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10); 1048 conf.set("user.name", UserGroupInformation.getCurrentUser().getUserName()); 1049 // Run a job that will be succeeded and validate its history file 1050 RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); 1051 1052 Path doneDir = JobHistory.getCompletedJobHistoryLocation(); 1053 assertEquals("History DONE folder not correct", 1054 doneFolder, doneDir.toString()); 1055 JobID id = job.getID(); 1056 String logFileName = getDoneFile(conf, id, doneDir); 1057 1058 // Framework history log file location 1059 Path logFile = new Path(doneDir, logFileName); 1060 FileSystem fileSys = logFile.getFileSystem(conf); 1061 1062 // Check if the history file exists 1063 System.err.println("testJobHistoryFile -- seeking " + logFile); 1064 assertTrue("History file does not exist", fileSys.exists(logFile)); 1065 1066 // check if the corresponding conf file exists 1067 Path confFile = getPathForConf(logFile); 1068 assertTrue("Config for completed jobs doesnt exist: " + confFile, 1069 fileSys.exists(confFile)); 1070 1071 // check if the file exists in a done folder 1072 assertTrue("Completed job config doesnt exist under the done folder", 1073 confFile.toString().startsWith(doneDir.toString())); 1074 1075 // check if the file exists in a done folder 1076 assertTrue("Completed jobs doesnt exist in the done folder", 1077 logFile.toString().startsWith(doneDir.toString())); 1078 1079 assertTrue("Completed job and config file aren't in the same directory", 1080 confFile.getParent().toString().equals(logFile.getParent().toString())); 1081 1082 1083 // check if the job file is removed from the history location 1084 Path runningJobsHistoryFolder = logFile.getParent().getParent(); 1085 Path runningJobHistoryFilename = 1086 new Path(runningJobsHistoryFolder, logFile.getName()); 1087 Path runningJobConfFilename = 1088 new Path(runningJobsHistoryFolder, confFile.getName()); 1089 assertFalse("History file not deleted from the running folder", 1090 fileSys.exists(runningJobHistoryFilename)); 1091 assertFalse("Config for completed jobs not deleted from running folder", 1092 fileSys.exists(runningJobConfFilename)); 1093 1094 validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false); 1095 validateJobHistoryFileContent(mr, job, conf); 1096 1097 // get the job conf filename 1098 String name = JobHistory.JobInfo.getLocalJobFilePath(job.getID()); 1099 File file = new File(name); 1100 1101 // check if the file get deleted 1102 while (file.exists()) { 1103 LOG.info("Waiting for " + file + " to be deleted"); 1104 UtilsForTests.waitFor(100); 1105 } 1106 } finally { 1107 if (mr != null) { 1108 cleanupLocalFiles(mr); 1109 mr.shutdown(); 1110 } 1111 } 1112 } 1113 1114 //Returns the file in the done folder 1115 //Waits for sometime to get the file moved to done getDoneFile(JobConf conf, JobID id, Path doneDir)1116 static String getDoneFile(JobConf conf, JobID id, 1117 Path doneDir) throws IOException { 1118 String name = null; 1119 for (int i = 0; name == null && i < 20; i++) { 1120 name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id); 1121 UtilsForTests.waitFor(1000); 1122 } 1123 return name; 1124 } 1125 // Returns the output path where user history log file is written to with 1126 // default configuration setting for hadoop.job.history.user.location getLogLocationInOutputPath(String logFileName, JobConf conf)1127 private static Path getLogLocationInOutputPath 1128 (String logFileName, JobConf conf) { 1129 JobConf jobConf = new JobConf(true);//default JobConf 1130 FileOutputFormat.setOutputPath(jobConf, 1131 FileOutputFormat.getOutputPath(conf)); 1132 1133 Path result = JobHistory.JobInfo.getJobHistoryLogLocationForUser 1134 (logFileName, jobConf); 1135 return result; 1136 } 1137 coreLogLocation(String subdirLogLocation)1138 static private String coreLogLocation(String subdirLogLocation) { 1139 return subdirLogLocation.substring 1140 (subdirLogLocation.lastIndexOf(Path.SEPARATOR_CHAR) + 1); 1141 } 1142 1143 /** 1144 * Checks if the user history file exists in the correct dir 1145 * @param id job id 1146 * @param conf job conf 1147 */ validateJobHistoryUserLogLocation(JobID id, JobConf conf)1148 private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 1149 throws IOException { 1150 // Get the history file name 1151 Path doneDir = JobHistory.getCompletedJobHistoryLocation(); 1152 String logFileName = getDoneFile(conf, id, doneDir); 1153 1154 // User history log file location 1155 Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser( 1156 coreLogLocation(logFileName), conf); 1157 1158 if(logFile == null) { 1159 // get the output path where history file is written to when 1160 // hadoop.job.history.user.location is not set 1161 1162 logFile = getLogLocationInOutputPath(coreLogLocation(logFileName), conf); 1163 } 1164 1165 FileSystem fileSys = null; 1166 fileSys = logFile.getFileSystem(conf); 1167 1168 // Check if the user history file exists in the correct dir 1169 if (conf.get("hadoop.job.history.user.location") == null) { 1170 assertTrue("User log file " + logFile + " does not exist", 1171 fileSys.exists(logFile)); 1172 } 1173 else if ("none".equals(conf.get("hadoop.job.history.user.location"))) { 1174 // history file should not exist in the output path 1175 assertFalse("Unexpected. User log file exists in output dir when " + 1176 "hadoop.job.history.user.location is set to \"none\"", 1177 fileSys.exists(logFile)); 1178 } 1179 else { 1180 //hadoop.job.history.user.location is set to a specific location. 1181 // User log file should exist in that location 1182 assertTrue("User log file " + logFile + " does not exist", 1183 fileSys.exists(logFile)); 1184 1185 // User log file should not exist in output path. 1186 1187 // get the output path where history file is written to when 1188 // hadoop.job.history.user.location is not set 1189 Path logFile1 = getLogLocationInOutputPath(logFileName, conf); 1190 1191 if (logFile != logFile1) { 1192 fileSys = logFile1.getFileSystem(conf); 1193 assertFalse("Unexpected. User log file exists in output dir when " + 1194 "hadoop.job.history.user.location is set to a different location", 1195 fileSys.exists(logFile1)); 1196 } 1197 } 1198 } 1199 1200 // Validate user history file location for the given values of 1201 // hadoop.job.history.user.location as 1202 // (1)null(default case), (2)"none", and (3)some user specified dir. testJobHistoryUserLogLocation()1203 public void testJobHistoryUserLogLocation() throws IOException { 1204 MiniMRCluster mr = null; 1205 try { 1206 mr = new MiniMRCluster(2, "file:///", 3); 1207 1208 // run the TCs 1209 JobConf conf = mr.createJobConf(); 1210 1211 FileSystem fs = FileSystem.get(conf); 1212 // clean up 1213 fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true); 1214 1215 Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1"); 1216 Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1"); 1217 conf.set("user.name", UserGroupInformation.getCurrentUser().getUserName()); 1218 // validate for the case of null(default) 1219 RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); 1220 validateJobHistoryUserLogLocation(job.getID(), conf); 1221 1222 inDir = new Path(TEST_ROOT_DIR + "/succeed/input2"); 1223 outDir = new Path(TEST_ROOT_DIR + "/succeed/output2"); 1224 // validate for the case of "none" 1225 conf.set("hadoop.job.history.user.location", "none"); 1226 job = UtilsForTests.runJobSucceed(conf, inDir, outDir); 1227 validateJobHistoryUserLogLocation(job.getID(), conf); 1228 1229 inDir = new Path(TEST_ROOT_DIR + "/succeed/input3"); 1230 outDir = new Path(TEST_ROOT_DIR + "/succeed/output3"); 1231 // validate for the case of any dir 1232 conf.set("hadoop.job.history.user.location", TEST_ROOT_DIR + "/succeed"); 1233 job = UtilsForTests.runJobSucceed(conf, inDir, outDir); 1234 validateJobHistoryUserLogLocation(job.getID(), conf); 1235 1236 } finally { 1237 if (mr != null) { 1238 cleanupLocalFiles(mr); 1239 mr.shutdown(); 1240 } 1241 } 1242 } 1243 cleanupLocalFiles(MiniMRCluster mr)1244 private void cleanupLocalFiles(MiniMRCluster mr) 1245 throws IOException { 1246 Configuration conf = mr.createJobConf(); 1247 JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); 1248 Path sysDir = new Path(jt.getSystemDir()); 1249 FileSystem fs = sysDir.getFileSystem(conf); 1250 fs.delete(sysDir, true); 1251 Path jobHistoryDir = JobHistory.getJobHistoryLocation(); 1252 fs = jobHistoryDir.getFileSystem(conf); 1253 fs.delete(jobHistoryDir, true); 1254 } 1255 1256 /** 1257 * Checks if the history file has expected job status 1258 * @param id job id 1259 * @param conf job conf 1260 */ validateJobHistoryJobStatus(JobID id, JobConf conf, String status)1261 private static void validateJobHistoryJobStatus(JobID id, JobConf conf, 1262 String status) throws IOException { 1263 1264 // Get the history file name 1265 Path doneDir = JobHistory.getCompletedJobHistoryLocation(); 1266 String logFileName = getDoneFile(conf, id, doneDir); 1267 1268 // Framework history log file location 1269 Path logFile = new Path(doneDir, logFileName); 1270 FileSystem fileSys = logFile.getFileSystem(conf); 1271 1272 // Check if the history file exists 1273 System.err.println("validateJobHistoryJobStatus -- seeking " + logFile); 1274 assertTrue("History file does not exist", fileSys.exists(logFile)); 1275 1276 // check history file permission 1277 assertTrue("History file permissions does not match", 1278 fileSys.getFileStatus(logFile).getPermission().equals( 1279 new FsPermission(JobHistory.HISTORY_FILE_PERMISSION))); 1280 1281 // check if the history file is parsable 1282 String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName( 1283 logFileName).split("_"); 1284 1285 String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4]; 1286 JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId); 1287 1288 DefaultJobHistoryParser.JobTasksParseListener l = 1289 new DefaultJobHistoryParser.JobTasksParseListener(jobInfo); 1290 JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys); 1291 1292 assertTrue("Job Status read from job history file is not the expected" + 1293 " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS))); 1294 } 1295 1296 // run jobs that will be (1) succeeded (2) failed (3) killed 1297 // and validate job status read from history file in each case testJobHistoryJobStatus()1298 public void testJobHistoryJobStatus() throws IOException { 1299 MiniMRCluster mr = null; 1300 try { 1301 mr = new MiniMRCluster(2, "file:///", 3); 1302 1303 // run the TCs 1304 JobConf conf = mr.createJobConf(); 1305 1306 FileSystem fs = FileSystem.get(conf); 1307 // clean up 1308 fs.delete(new Path(TEST_ROOT_DIR + "/succeedfailkilljob"), true); 1309 1310 Path inDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/input"); 1311 Path outDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/output"); 1312 conf.set("user.name", UserGroupInformation.getCurrentUser().getUserName()); 1313 // Run a job that will be succeeded and validate its job status 1314 // existing in history file 1315 RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir); 1316 validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS"); 1317 long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan(); 1318 assertTrue(historyCleanerRanAt != 0); 1319 1320 // Run a job that will be failed and validate its job status 1321 // existing in history file 1322 job = UtilsForTests.runJobFail(conf, inDir, outDir); 1323 validateJobHistoryJobStatus(job.getID(), conf, "FAILED"); 1324 assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan()); 1325 1326 // Run a job that will be killed and validate its job status 1327 // existing in history file 1328 job = UtilsForTests.runJobKill(conf, inDir, outDir); 1329 validateJobHistoryJobStatus(job.getID(), conf, "KILLED"); 1330 assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan()); 1331 1332 } finally { 1333 if (mr != null) { 1334 cleanupLocalFiles(mr); 1335 mr.shutdown(); 1336 } 1337 } 1338 } 1339 testJobHistoryCleaner()1340 public void testJobHistoryCleaner() throws Exception { 1341 JobConf conf = new JobConf(); 1342 FileSystem fs = FileSystem.get(conf); 1343 JobHistory.DONEDIR_FS = fs; 1344 JobHistory.DONE = new Path(TEST_ROOT_DIR + "/done"); 1345 Path histDirOld = new Path(JobHistory.DONE, "version-1/jtinstid/2013/02/05/000000/"); 1346 Path histDirOnLine = new Path(JobHistory.DONE, "version-1/jtinstid/2013/02/06/000000/"); 1347 final int dayMillis = 1000 * 60 * 60 * 24; 1348 1349 try { 1350 Calendar runTime = Calendar.getInstance(); 1351 runTime.clear(); 1352 runTime.set(2013, 1, 8, 12, 0); 1353 long runTimeMillis = runTime.getTimeInMillis(); 1354 1355 fs.mkdirs(histDirOld); 1356 fs.mkdirs(histDirOnLine); 1357 Path histFileOldDir = new Path(histDirOld, "jobfile1.txt"); 1358 Path histFileOnLineDir = new Path(histDirOnLine, "jobfile1.txt"); 1359 Path histFileDontDelete = new Path(histDirOnLine, "jobfile2.txt"); 1360 fs.create(histFileOldDir).close(); 1361 fs.create(histFileOnLineDir).close(); 1362 fs.create(histFileDontDelete).close(); 1363 new File(histFileOnLineDir.toUri()).setLastModified( 1364 runTimeMillis - dayMillis * 5 / 2); 1365 new File(histFileDontDelete.toUri()).setLastModified( 1366 runTimeMillis - dayMillis * 3 / 2); 1367 1368 HistoryCleaner.maxAgeOfHistoryFiles = dayMillis * 2; // two days 1369 HistoryCleaner historyCleaner = new HistoryCleaner(); 1370 1371 historyCleaner.clean(runTimeMillis); 1372 1373 assertFalse(fs.exists(histDirOld)); 1374 assertTrue(fs.exists(histDirOnLine)); 1375 assertFalse(fs.exists(histFileOldDir)); 1376 assertFalse(fs.exists(histFileOnLineDir)); 1377 assertTrue(fs.exists(histFileDontDelete)); 1378 } finally { 1379 fs.delete(JobHistory.DONE, true); 1380 } 1381 } 1382 } 1383