1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.hs; 20 21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic 22 .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; 23 import static org.junit.Assert.assertEquals; 24 import static org.junit.Assert.assertNotNull; 25 import static org.junit.Assert.assertNull; 26 import static org.junit.Assert.assertTrue; 27 28 import java.io.ByteArrayOutputStream; 29 import java.io.IOException; 30 import java.io.PrintStream; 31 import java.util.Arrays; 32 import java.util.Collections; 33 import java.util.HashMap; 34 import java.util.List; 35 import java.util.Map; 36 import java.util.StringTokenizer; 37 import java.util.concurrent.atomic.AtomicInteger; 38 39 import org.junit.Assert; 40 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 import org.apache.hadoop.conf.Configuration; 44 import org.apache.hadoop.fs.FSDataInputStream; 45 import org.apache.hadoop.fs.FileContext; 46 import org.apache.hadoop.fs.FileSystem; 47 import org.apache.hadoop.fs.Path; 48 import org.apache.hadoop.mapreduce.Counters; 49 import org.apache.hadoop.mapreduce.JobID; 50 import org.apache.hadoop.mapreduce.MRJobConfig; 51 import org.apache.hadoop.mapreduce.TaskID; 52 import org.apache.hadoop.mapreduce.TypeConverter; 53 import org.apache.hadoop.mapreduce.jobhistory.EventReader; 54 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; 55 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; 56 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; 57 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo; 58 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; 59 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; 60 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; 61 import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; 62 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; 63 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; 64 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; 65 import org.apache.hadoop.mapreduce.v2.api.records.JobId; 66 import org.apache.hadoop.mapreduce.v2.api.records.JobState; 67 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 68 import org.apache.hadoop.mapreduce.v2.api.records.TaskId; 69 import org.apache.hadoop.mapreduce.v2.api.records.TaskState; 70 import org.apache.hadoop.mapreduce.v2.api.records.TaskType; 71 import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl; 72 import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl; 73 import org.apache.hadoop.mapreduce.v2.app.MRApp; 74 import org.apache.hadoop.mapreduce.v2.app.job.Job; 75 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; 76 import org.apache.hadoop.mapreduce.v2.app.job.Task; 77 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; 78 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; 79 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; 80 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; 81 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; 82 import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; 83 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; 84 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; 85 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; 86 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; 87 import org.apache.hadoop.net.DNSToSwitchMapping; 88 import org.apache.hadoop.security.UserGroupInformation; 89 import org.apache.hadoop.service.Service; 90 import org.apache.hadoop.yarn.api.records.ContainerId; 91 import org.apache.hadoop.yarn.util.RackResolver; 92 import org.junit.Test; 93 import org.mockito.Mockito; 94 import org.mockito.invocation.InvocationOnMock; 95 import org.mockito.stubbing.Answer; 96 97 public class TestJobHistoryParsing { 98 private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class); 99 100 private static final String RACK_NAME = "/MyRackName"; 101 102 private ByteArrayOutputStream outContent = new ByteArrayOutputStream(); 103 104 public static class MyResolver implements DNSToSwitchMapping { 105 @Override resolve(List<String> names)106 public List<String> resolve(List<String> names) { 107 return Arrays.asList(new String[] { RACK_NAME }); 108 } 109 110 @Override reloadCachedMappings()111 public void reloadCachedMappings() { 112 } 113 114 @Override reloadCachedMappings(List<String> names)115 public void reloadCachedMappings(List<String> names) { 116 } 117 } 118 119 @Test(timeout = 50000) testJobInfo()120 public void testJobInfo() throws Exception { 121 JobInfo info = new JobInfo(); 122 Assert.assertEquals("NORMAL", info.getPriority()); 123 info.printAll(); 124 } 125 126 @Test(timeout = 300000) testHistoryParsing()127 public void testHistoryParsing() throws Exception { 128 LOG.info("STARTING testHistoryParsing()"); 129 try { 130 checkHistoryParsing(2, 1, 2); 131 } finally { 132 LOG.info("FINISHED testHistoryParsing()"); 133 } 134 } 135 136 @Test(timeout = 50000) testHistoryParsingWithParseErrors()137 public void testHistoryParsingWithParseErrors() throws Exception { 138 LOG.info("STARTING testHistoryParsingWithParseErrors()"); 139 try { 140 checkHistoryParsing(3, 0, 2); 141 } finally { 142 LOG.info("FINISHED testHistoryParsingWithParseErrors()"); 143 } 144 } 145 getJobSummary(FileContext fc, Path path)146 private static String getJobSummary(FileContext fc, Path path) 147 throws IOException { 148 Path qPath = fc.makeQualified(path); 149 FSDataInputStream in = fc.open(qPath); 150 String jobSummaryString = in.readUTF(); 151 in.close(); 152 return jobSummaryString; 153 } 154 checkHistoryParsing(final int numMaps, final int numReduces, final int numSuccessfulMaps)155 private void checkHistoryParsing(final int numMaps, final int numReduces, 156 final int numSuccessfulMaps) throws Exception { 157 Configuration conf = new Configuration(); 158 conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); 159 long amStartTimeEst = System.currentTimeMillis(); 160 conf.setClass( 161 NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 162 MyResolver.class, DNSToSwitchMapping.class); 163 RackResolver.init(conf); 164 MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass() 165 .getName(), true); 166 app.submit(conf); 167 Job job = app.getContext().getAllJobs().values().iterator().next(); 168 JobId jobId = job.getID(); 169 LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); 170 app.waitForState(job, JobState.SUCCEEDED); 171 172 // make sure all events are flushed 173 app.waitForState(Service.STATE.STOPPED); 174 175 String jobhistoryDir = JobHistoryUtils 176 .getHistoryIntermediateDoneDirForUser(conf); 177 178 FileContext fc = null; 179 try { 180 fc = FileContext.getFileContext(conf); 181 } catch (IOException ioe) { 182 LOG.info("Can not get FileContext", ioe); 183 throw (new Exception("Can not get File Context")); 184 } 185 186 if (numMaps == numSuccessfulMaps) { 187 String summaryFileName = JobHistoryUtils 188 .getIntermediateSummaryFileName(jobId); 189 Path summaryFile = new Path(jobhistoryDir, summaryFileName); 190 String jobSummaryString = getJobSummary(fc, summaryFile); 191 Assert.assertNotNull(jobSummaryString); 192 Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); 193 Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); 194 195 Map<String, String> jobSummaryElements = new HashMap<String, String>(); 196 StringTokenizer strToken = new StringTokenizer(jobSummaryString, ","); 197 while (strToken.hasMoreTokens()) { 198 String keypair = strToken.nextToken(); 199 jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]); 200 } 201 202 Assert.assertEquals("JobId does not match", jobId.toString(), 203 jobSummaryElements.get("jobId")); 204 Assert.assertEquals("JobName does not match", "test", 205 jobSummaryElements.get("jobName")); 206 Assert.assertTrue("submitTime should not be 0", 207 Long.parseLong(jobSummaryElements.get("submitTime")) != 0); 208 Assert.assertTrue("launchTime should not be 0", 209 Long.parseLong(jobSummaryElements.get("launchTime")) != 0); 210 Assert 211 .assertTrue( 212 "firstMapTaskLaunchTime should not be 0", 213 Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); 214 Assert 215 .assertTrue("firstReduceTaskLaunchTime should not be 0", 216 Long.parseLong(jobSummaryElements 217 .get("firstReduceTaskLaunchTime")) != 0); 218 Assert.assertTrue("finishTime should not be 0", 219 Long.parseLong(jobSummaryElements.get("finishTime")) != 0); 220 Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps, 221 Integer.parseInt(jobSummaryElements.get("numMaps"))); 222 Assert.assertEquals("Mismatch in num reduce slots", numReduces, 223 Integer.parseInt(jobSummaryElements.get("numReduces"))); 224 Assert.assertEquals("User does not match", 225 System.getProperty("user.name"), jobSummaryElements.get("user")); 226 Assert.assertEquals("Queue does not match", "default", 227 jobSummaryElements.get("queue")); 228 Assert.assertEquals("Status does not match", "SUCCEEDED", 229 jobSummaryElements.get("status")); 230 } 231 232 JobHistory jobHistory = new JobHistory(); 233 jobHistory.init(conf); 234 HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); 235 JobInfo jobInfo; 236 long numFinishedMaps; 237 238 synchronized (fileInfo) { 239 Path historyFilePath = fileInfo.getHistoryFile(); 240 FSDataInputStream in = null; 241 LOG.info("JobHistoryFile is: " + historyFilePath); 242 try { 243 in = fc.open(fc.makeQualified(historyFilePath)); 244 } catch (IOException ioe) { 245 LOG.info("Can not open history file: " + historyFilePath, ioe); 246 throw (new Exception("Can not open History File")); 247 } 248 249 JobHistoryParser parser = new JobHistoryParser(in); 250 final EventReader realReader = new EventReader(in); 251 EventReader reader = Mockito.mock(EventReader.class); 252 if (numMaps == numSuccessfulMaps) { 253 reader = realReader; 254 } else { 255 final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! 256 Mockito.when(reader.getNextEvent()).thenAnswer( 257 new Answer<HistoryEvent>() { 258 public HistoryEvent answer(InvocationOnMock invocation) 259 throws IOException { 260 HistoryEvent event = realReader.getNextEvent(); 261 if (event instanceof TaskFinishedEvent) { 262 numFinishedEvents.incrementAndGet(); 263 } 264 265 if (numFinishedEvents.get() <= numSuccessfulMaps) { 266 return event; 267 } else { 268 throw new IOException("test"); 269 } 270 } 271 }); 272 } 273 274 jobInfo = parser.parse(reader); 275 276 numFinishedMaps = computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps); 277 278 if (numFinishedMaps != numMaps) { 279 Exception parseException = parser.getParseException(); 280 Assert.assertNotNull("Didn't get expected parse exception", 281 parseException); 282 } 283 } 284 285 Assert.assertEquals("Incorrect username ", System.getProperty("user.name"), 286 jobInfo.getUsername()); 287 Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname()); 288 Assert.assertEquals("Incorrect queuename ", "default", 289 jobInfo.getJobQueueName()); 290 Assert 291 .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath()); 292 Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps, 293 numFinishedMaps); 294 Assert.assertEquals("incorrect finishedReduces ", numReduces, 295 jobInfo.getFinishedReduces()); 296 Assert.assertEquals("incorrect uberized ", job.isUber(), 297 jobInfo.getUberized()); 298 Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); 299 int totalTasks = allTasks.size(); 300 Assert.assertEquals("total number of tasks is incorrect ", 301 (numMaps + numReduces), totalTasks); 302 303 // Verify aminfo 304 Assert.assertEquals(1, jobInfo.getAMInfos().size()); 305 Assert.assertEquals(MRApp.NM_HOST, jobInfo.getAMInfos().get(0) 306 .getNodeManagerHost()); 307 AMInfo amInfo = jobInfo.getAMInfos().get(0); 308 Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort()); 309 Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort()); 310 Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId()); 311 Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId() 312 .getApplicationAttemptId()); 313 Assert.assertTrue(amInfo.getStartTime() <= System.currentTimeMillis() 314 && amInfo.getStartTime() >= amStartTimeEst); 315 316 ContainerId fakeCid = MRApp.newContainerId(-1, -1, -1, -1); 317 // Assert at taskAttempt level 318 for (TaskInfo taskInfo : allTasks.values()) { 319 int taskAttemptCount = taskInfo.getAllTaskAttempts().size(); 320 Assert 321 .assertEquals("total number of task attempts ", 1, taskAttemptCount); 322 TaskAttemptInfo taInfo = taskInfo.getAllTaskAttempts().values() 323 .iterator().next(); 324 Assert.assertNotNull(taInfo.getContainerId()); 325 // Verify the wrong ctor is not being used. Remove after mrv1 is removed. 326 Assert.assertFalse(taInfo.getContainerId().equals(fakeCid)); 327 } 328 329 // Deep compare Job and JobInfo 330 for (Task task : job.getTasks().values()) { 331 TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); 332 Assert.assertNotNull("TaskInfo not found", taskInfo); 333 for (TaskAttempt taskAttempt : task.getAttempts().values()) { 334 TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( 335 TypeConverter.fromYarn((taskAttempt.getID()))); 336 Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo); 337 Assert.assertEquals("Incorrect shuffle port for task attempt", 338 taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort()); 339 if (numMaps == numSuccessfulMaps) { 340 Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname()); 341 Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort()); 342 343 // Verify rack-name 344 Assert.assertEquals("rack-name is incorrect", 345 taskAttemptInfo.getRackname(), RACK_NAME); 346 } 347 } 348 } 349 350 // test output for HistoryViewer 351 PrintStream stdps = System.out; 352 try { 353 System.setOut(new PrintStream(outContent)); 354 HistoryViewer viewer; 355 synchronized (fileInfo) { 356 viewer = new HistoryViewer(fc.makeQualified( 357 fileInfo.getHistoryFile()).toString(), conf, true); 358 } 359 viewer.print(); 360 361 for (TaskInfo taskInfo : allTasks.values()) { 362 363 String test = (taskInfo.getTaskStatus() == null ? "" : taskInfo 364 .getTaskStatus()) 365 + " " 366 + taskInfo.getTaskType() 367 + " task list for " + taskInfo.getTaskId().getJobID(); 368 Assert.assertTrue(outContent.toString().indexOf(test) > 0); 369 Assert.assertTrue(outContent.toString().indexOf( 370 taskInfo.getTaskId().toString()) > 0); 371 } 372 } finally { 373 System.setOut(stdps); 374 375 } 376 } 377 378 // Computes finished maps similar to RecoveryService... computeFinishedMaps(JobInfo jobInfo, int numMaps, int numSuccessfulMaps)379 private long computeFinishedMaps(JobInfo jobInfo, int numMaps, 380 int numSuccessfulMaps) { 381 if (numMaps == numSuccessfulMaps) { 382 return jobInfo.getFinishedMaps(); 383 } 384 385 long numFinishedMaps = 0; 386 Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo 387 .getAllTasks(); 388 for (TaskInfo taskInfo : taskInfos.values()) { 389 if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { 390 ++numFinishedMaps; 391 } 392 } 393 return numFinishedMaps; 394 } 395 396 @Test(timeout = 30000) testHistoryParsingForFailedAttempts()397 public void testHistoryParsingForFailedAttempts() throws Exception { 398 LOG.info("STARTING testHistoryParsingForFailedAttempts"); 399 try { 400 Configuration conf = new Configuration(); 401 conf.setClass( 402 NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 403 MyResolver.class, DNSToSwitchMapping.class); 404 RackResolver.init(conf); 405 MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this 406 .getClass().getName(), true); 407 app.submit(conf); 408 Job job = app.getContext().getAllJobs().values().iterator().next(); 409 JobId jobId = job.getID(); 410 app.waitForState(job, JobState.SUCCEEDED); 411 412 // make sure all events are flushed 413 app.waitForState(Service.STATE.STOPPED); 414 415 JobHistory jobHistory = new JobHistory(); 416 jobHistory.init(conf); 417 HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); 418 419 JobHistoryParser parser; 420 JobInfo jobInfo; 421 synchronized (fileInfo) { 422 Path historyFilePath = fileInfo.getHistoryFile(); 423 FSDataInputStream in = null; 424 FileContext fc = null; 425 try { 426 fc = FileContext.getFileContext(conf); 427 in = fc.open(fc.makeQualified(historyFilePath)); 428 } catch (IOException ioe) { 429 LOG.info("Can not open history file: " + historyFilePath, ioe); 430 throw (new Exception("Can not open History File")); 431 } 432 433 parser = new JobHistoryParser(in); 434 jobInfo = parser.parse(); 435 } 436 Exception parseException = parser.getParseException(); 437 Assert.assertNull("Caught an expected exception " + parseException, 438 parseException); 439 int noOffailedAttempts = 0; 440 Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks(); 441 for (Task task : job.getTasks().values()) { 442 TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID())); 443 for (TaskAttempt taskAttempt : task.getAttempts().values()) { 444 TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( 445 TypeConverter.fromYarn((taskAttempt.getID()))); 446 // Verify rack-name for all task attempts 447 Assert.assertEquals("rack-name is incorrect", 448 taskAttemptInfo.getRackname(), RACK_NAME); 449 if (taskAttemptInfo.getTaskStatus().equals("FAILED")) { 450 noOffailedAttempts++; 451 } 452 } 453 } 454 Assert.assertEquals("No of Failed tasks doesn't match.", 2, 455 noOffailedAttempts); 456 } finally { 457 LOG.info("FINISHED testHistoryParsingForFailedAttempts"); 458 } 459 } 460 461 @Test(timeout = 60000) testCountersForFailedTask()462 public void testCountersForFailedTask() throws Exception { 463 LOG.info("STARTING testCountersForFailedTask"); 464 try { 465 Configuration conf = new Configuration(); 466 conf.setClass( 467 NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 468 MyResolver.class, DNSToSwitchMapping.class); 469 RackResolver.init(conf); 470 MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this 471 .getClass().getName(), true); 472 app.submit(conf); 473 Job job = app.getContext().getAllJobs().values().iterator().next(); 474 JobId jobId = job.getID(); 475 app.waitForState(job, JobState.FAILED); 476 477 // make sure all events are flushed 478 app.waitForState(Service.STATE.STOPPED); 479 480 JobHistory jobHistory = new JobHistory(); 481 jobHistory.init(conf); 482 483 HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); 484 485 JobHistoryParser parser; 486 JobInfo jobInfo; 487 synchronized (fileInfo) { 488 Path historyFilePath = fileInfo.getHistoryFile(); 489 FSDataInputStream in = null; 490 FileContext fc = null; 491 try { 492 fc = FileContext.getFileContext(conf); 493 in = fc.open(fc.makeQualified(historyFilePath)); 494 } catch (IOException ioe) { 495 LOG.info("Can not open history file: " + historyFilePath, ioe); 496 throw (new Exception("Can not open History File")); 497 } 498 499 parser = new JobHistoryParser(in); 500 jobInfo = parser.parse(); 501 } 502 Exception parseException = parser.getParseException(); 503 Assert.assertNull("Caught an expected exception " + parseException, 504 parseException); 505 for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) { 506 TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey()); 507 CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue()); 508 Assert.assertNotNull("completed task report has null counters", ct 509 .getReport().getCounters()); 510 } 511 final List<String> originalDiagnostics = job.getDiagnostics(); 512 final String historyError = jobInfo.getErrorInfo(); 513 assertTrue("No original diagnostics for a failed job", 514 originalDiagnostics != null && !originalDiagnostics.isEmpty()); 515 assertNotNull("No history error info for a failed job ", historyError); 516 for (String diagString : originalDiagnostics) { 517 assertTrue(historyError.contains(diagString)); 518 } 519 } finally { 520 LOG.info("FINISHED testCountersForFailedTask"); 521 } 522 } 523 524 @Test(timeout = 60000) testDiagnosticsForKilledJob()525 public void testDiagnosticsForKilledJob() throws Exception { 526 LOG.info("STARTING testDiagnosticsForKilledJob"); 527 try { 528 final Configuration conf = new Configuration(); 529 conf.setClass( 530 NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 531 MyResolver.class, DNSToSwitchMapping.class); 532 RackResolver.init(conf); 533 MRApp app = new MRAppWithHistoryWithJobKilled(2, 1, true, this 534 .getClass().getName(), true); 535 app.submit(conf); 536 Job job = app.getContext().getAllJobs().values().iterator().next(); 537 JobId jobId = job.getID(); 538 app.waitForState(job, JobState.KILLED); 539 540 // make sure all events are flushed 541 app.waitForState(Service.STATE.STOPPED); 542 543 JobHistory jobHistory = new JobHistory(); 544 jobHistory.init(conf); 545 546 HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); 547 548 JobHistoryParser parser; 549 JobInfo jobInfo; 550 synchronized (fileInfo) { 551 Path historyFilePath = fileInfo.getHistoryFile(); 552 FSDataInputStream in = null; 553 FileContext fc = null; 554 try { 555 fc = FileContext.getFileContext(conf); 556 in = fc.open(fc.makeQualified(historyFilePath)); 557 } catch (IOException ioe) { 558 LOG.info("Can not open history file: " + historyFilePath, ioe); 559 throw (new Exception("Can not open History File")); 560 } 561 562 parser = new JobHistoryParser(in); 563 jobInfo = parser.parse(); 564 } 565 Exception parseException = parser.getParseException(); 566 assertNull("Caught an expected exception " + parseException, 567 parseException); 568 final List<String> originalDiagnostics = job.getDiagnostics(); 569 final String historyError = jobInfo.getErrorInfo(); 570 assertTrue("No original diagnostics for a failed job", 571 originalDiagnostics != null && !originalDiagnostics.isEmpty()); 572 assertNotNull("No history error info for a failed job ", historyError); 573 for (String diagString : originalDiagnostics) { 574 assertTrue(historyError.contains(diagString)); 575 } 576 assertTrue("No killed message in diagnostics", 577 historyError.contains(JobImpl.JOB_KILLED_DIAG)); 578 } finally { 579 LOG.info("FINISHED testDiagnosticsForKilledJob"); 580 } 581 } 582 583 @Test(timeout = 50000) testScanningOldDirs()584 public void testScanningOldDirs() throws Exception { 585 LOG.info("STARTING testScanningOldDirs"); 586 try { 587 Configuration conf = new Configuration(); 588 conf.setClass( 589 NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 590 MyResolver.class, DNSToSwitchMapping.class); 591 RackResolver.init(conf); 592 MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), 593 true); 594 app.submit(conf); 595 Job job = app.getContext().getAllJobs().values().iterator().next(); 596 JobId jobId = job.getID(); 597 LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); 598 app.waitForState(job, JobState.SUCCEEDED); 599 600 // make sure all events are flushed 601 app.waitForState(Service.STATE.STOPPED); 602 603 HistoryFileManagerForTest hfm = new HistoryFileManagerForTest(); 604 hfm.init(conf); 605 HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); 606 Assert.assertNotNull("Unable to locate job history", fileInfo); 607 608 // force the manager to "forget" the job 609 hfm.deleteJobFromJobListCache(fileInfo); 610 final int msecPerSleep = 10; 611 int msecToSleep = 10 * 1000; 612 while (fileInfo.isMovePending() && msecToSleep > 0) { 613 Assert.assertTrue(!fileInfo.didMoveFail()); 614 msecToSleep -= msecPerSleep; 615 Thread.sleep(msecPerSleep); 616 } 617 Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0); 618 619 fileInfo = hfm.getFileInfo(jobId); 620 hfm.stop(); 621 Assert.assertNotNull("Unable to locate old job history", fileInfo); 622 Assert.assertTrue("HistoryFileManager not shutdown properly", 623 hfm.moveToDoneExecutor.isTerminated()); 624 } finally { 625 LOG.info("FINISHED testScanningOldDirs"); 626 } 627 } 628 629 static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { 630 MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart)631 public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, 632 boolean autoComplete, String testName, boolean cleanOnStart) { 633 super(maps, reduces, autoComplete, testName, cleanOnStart); 634 } 635 636 @SuppressWarnings("unchecked") 637 @Override attemptLaunched(TaskAttemptId attemptID)638 protected void attemptLaunched(TaskAttemptId attemptID) { 639 if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) { 640 getContext().getEventHandler().handle( 641 new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); 642 } else { 643 getContext().getEventHandler().handle( 644 new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); 645 } 646 } 647 } 648 649 static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory { 650 MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart)651 public MRAppWithHistoryWithFailedTask(int maps, int reduces, 652 boolean autoComplete, String testName, boolean cleanOnStart) { 653 super(maps, reduces, autoComplete, testName, cleanOnStart); 654 } 655 656 @SuppressWarnings("unchecked") 657 @Override attemptLaunched(TaskAttemptId attemptID)658 protected void attemptLaunched(TaskAttemptId attemptID) { 659 if (attemptID.getTaskId().getId() == 0) { 660 getContext().getEventHandler().handle( 661 new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); 662 } else { 663 getContext().getEventHandler().handle( 664 new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); 665 } 666 } 667 } 668 669 static class MRAppWithHistoryWithJobKilled extends MRAppWithHistory { 670 MRAppWithHistoryWithJobKilled(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart)671 public MRAppWithHistoryWithJobKilled(int maps, int reduces, 672 boolean autoComplete, String testName, boolean cleanOnStart) { 673 super(maps, reduces, autoComplete, testName, cleanOnStart); 674 } 675 676 @SuppressWarnings("unchecked") 677 @Override attemptLaunched(TaskAttemptId attemptID)678 protected void attemptLaunched(TaskAttemptId attemptID) { 679 if (attemptID.getTaskId().getId() == 0) { 680 getContext().getEventHandler().handle( 681 new JobEvent(attemptID.getTaskId().getJobId(), 682 JobEventType.JOB_KILL)); 683 } else { 684 getContext().getEventHandler().handle( 685 new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); 686 } 687 } 688 } 689 690 static class HistoryFileManagerForTest extends HistoryFileManager { deleteJobFromJobListCache(HistoryFileInfo fileInfo)691 void deleteJobFromJobListCache(HistoryFileInfo fileInfo) { 692 jobListCache.delete(fileInfo); 693 } 694 } 695 main(String[] args)696 public static void main(String[] args) throws Exception { 697 TestJobHistoryParsing t = new TestJobHistoryParsing(); 698 t.testHistoryParsing(); 699 t.testHistoryParsingForFailedAttempts(); 700 } 701 702 /** 703 * test clean old history files. Files should be deleted after 1 week by 704 * default. 705 */ 706 @Test(timeout = 15000) testDeleteFileInfo()707 public void testDeleteFileInfo() throws Exception { 708 LOG.info("STARTING testDeleteFileInfo"); 709 try { 710 Configuration conf = new Configuration(); 711 conf.setClass( 712 NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 713 MyResolver.class, DNSToSwitchMapping.class); 714 715 RackResolver.init(conf); 716 MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), 717 true); 718 app.submit(conf); 719 Job job = app.getContext().getAllJobs().values().iterator().next(); 720 JobId jobId = job.getID(); 721 722 app.waitForState(job, JobState.SUCCEEDED); 723 724 // make sure all events are flushed 725 app.waitForState(Service.STATE.STOPPED); 726 HistoryFileManager hfm = new HistoryFileManager(); 727 hfm.init(conf); 728 HistoryFileInfo fileInfo = hfm.getFileInfo(jobId); 729 hfm.initExisting(); 730 // wait for move files form the done_intermediate directory to the gone 731 // directory 732 while (fileInfo.isMovePending()) { 733 Thread.sleep(300); 734 } 735 736 Assert.assertNotNull(hfm.jobListCache.values()); 737 738 // try to remove fileInfo 739 hfm.clean(); 740 // check that fileInfo does not deleted 741 Assert.assertFalse(fileInfo.isDeleted()); 742 // correct live time 743 hfm.setMaxHistoryAge(-1); 744 hfm.clean(); 745 hfm.stop(); 746 Assert.assertTrue("Thread pool shutdown", 747 hfm.moveToDoneExecutor.isTerminated()); 748 // should be deleted ! 749 Assert.assertTrue("file should be deleted ", fileInfo.isDeleted()); 750 751 } finally { 752 LOG.info("FINISHED testDeleteFileInfo"); 753 } 754 } 755 756 /** 757 * Simple test some methods of JobHistory 758 */ 759 @Test(timeout = 20000) testJobHistoryMethods()760 public void testJobHistoryMethods() throws Exception { 761 LOG.info("STARTING testJobHistoryMethods"); 762 try { 763 Configuration configuration = new Configuration(); 764 configuration 765 .setClass( 766 NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 767 MyResolver.class, DNSToSwitchMapping.class); 768 769 RackResolver.init(configuration); 770 MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), 771 true); 772 app.submit(configuration); 773 Job job = app.getContext().getAllJobs().values().iterator().next(); 774 app.waitForState(job, JobState.SUCCEEDED); 775 776 JobHistory jobHistory = new JobHistory(); 777 jobHistory.init(configuration); 778 // Method getAllJobs 779 Assert.assertEquals(1, jobHistory.getAllJobs().size()); 780 // and with ApplicationId 781 Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size()); 782 783 JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default", 784 0L, System.currentTimeMillis() + 1, 0L, 785 System.currentTimeMillis() + 1, JobState.SUCCEEDED); 786 787 Assert.assertEquals(1, jobsinfo.getJobs().size()); 788 Assert.assertNotNull(jobHistory.getApplicationAttemptId()); 789 // test Application Id 790 Assert.assertEquals("application_0_0000", jobHistory.getApplicationID() 791 .toString()); 792 Assert 793 .assertEquals("Job History Server", jobHistory.getApplicationName()); 794 // method does not work 795 Assert.assertNull(jobHistory.getEventHandler()); 796 // method does not work 797 Assert.assertNull(jobHistory.getClock()); 798 // method does not work 799 Assert.assertNull(jobHistory.getClusterInfo()); 800 801 } finally { 802 LOG.info("FINISHED testJobHistoryMethods"); 803 } 804 } 805 806 /** 807 * Simple test PartialJob 808 */ 809 @Test(timeout = 3000) testPartialJob()810 public void testPartialJob() throws Exception { 811 JobId jobId = new JobIdPBImpl(); 812 jobId.setId(0); 813 JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user", 814 "jobName", jobId, 3, 2, "JobStatus"); 815 PartialJob test = new PartialJob(jii, jobId); 816 assertEquals(1.0f, test.getProgress(), 0.001); 817 assertNull(test.getAllCounters()); 818 assertNull(test.getTasks()); 819 assertNull(test.getTasks(TaskType.MAP)); 820 assertNull(test.getTask(new TaskIdPBImpl())); 821 822 assertNull(test.getTaskAttemptCompletionEvents(0, 100)); 823 assertNull(test.getMapAttemptCompletionEvents(0, 100)); 824 assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null)); 825 assertNull(test.getAMInfos()); 826 827 } 828 829 @Test testMultipleFailedTasks()830 public void testMultipleFailedTasks() throws Exception { 831 JobHistoryParser parser = 832 new JobHistoryParser(Mockito.mock(FSDataInputStream.class)); 833 EventReader reader = Mockito.mock(EventReader.class); 834 final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack! 835 final org.apache.hadoop.mapreduce.TaskType taskType = 836 org.apache.hadoop.mapreduce.TaskType.MAP; 837 final TaskID[] tids = new TaskID[2]; 838 final JobID jid = new JobID("1", 1); 839 tids[0] = new TaskID(jid, taskType, 0); 840 tids[1] = new TaskID(jid, taskType, 1); 841 Mockito.when(reader.getNextEvent()).thenAnswer( 842 new Answer<HistoryEvent>() { 843 public HistoryEvent answer(InvocationOnMock invocation) 844 throws IOException { 845 // send two task start and two task fail events for tasks 0 and 1 846 int eventId = numEventsRead.getAndIncrement(); 847 TaskID tid = tids[eventId & 0x1]; 848 if (eventId < 2) { 849 return new TaskStartedEvent(tid, 0, taskType, ""); 850 } 851 if (eventId < 4) { 852 TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType, 853 "failed", "FAILED", null, new Counters()); 854 tfe.setDatum(tfe.getDatum()); 855 return tfe; 856 } 857 if (eventId < 5) { 858 JobUnsuccessfulCompletionEvent juce = 859 new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0, 860 "JOB_FAILED", Collections.singletonList( 861 "Task failed: " + tids[0].toString())); 862 return juce; 863 } 864 return null; 865 } 866 }); 867 JobInfo info = parser.parse(reader); 868 assertTrue("Task 0 not implicated", 869 info.getErrorInfo().contains(tids[0].toString())); 870 } 871 872 @Test testFailedJobHistoryWithoutDiagnostics()873 public void testFailedJobHistoryWithoutDiagnostics() throws Exception { 874 final Path histPath = new Path(getClass().getClassLoader().getResource( 875 "job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist") 876 .getFile()); 877 final FileSystem lfs = FileSystem.getLocal(new Configuration()); 878 final FSDataInputStream fsdis = lfs.open(histPath); 879 try { 880 JobHistoryParser parser = new JobHistoryParser(fsdis); 881 JobInfo info = parser.parse(); 882 assertEquals("History parsed jobId incorrectly", 883 info.getJobId(), JobID.forName("job_1393307629410_0001") ); 884 assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo()); 885 } finally { 886 fsdis.close(); 887 } 888 } 889 890 /** 891 * Test compatibility of JobHistoryParser with 2.0.3-alpha history files 892 * @throws IOException 893 */ 894 @Test testTaskAttemptUnsuccessfulCompletionWithoutCounters203()895 public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException 896 { 897 Path histPath = new Path(getClass().getClassLoader().getResource( 898 "job_2.0.3-alpha-FAILED.jhist").getFile()); 899 JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal 900 (new Configuration()), histPath); 901 JobInfo jobInfo = parser.parse(); 902 LOG.info(" job info: " + jobInfo.getJobname() + " " 903 + jobInfo.getFinishedMaps() + " " 904 + jobInfo.getTotalMaps() + " " 905 + jobInfo.getJobId() ) ; 906 } 907 908 /** 909 * Test compatibility of JobHistoryParser with 2.4.0 history files 910 * @throws IOException 911 */ 912 @Test testTaskAttemptUnsuccessfulCompletionWithoutCounters240()913 public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException 914 { 915 Path histPath = new Path(getClass().getClassLoader().getResource( 916 "job_2.4.0-FAILED.jhist").getFile()); 917 JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal 918 (new Configuration()), histPath); 919 JobInfo jobInfo = parser.parse(); 920 LOG.info(" job info: " + jobInfo.getJobname() + " " 921 + jobInfo.getFinishedMaps() + " " 922 + jobInfo.getTotalMaps() + " " 923 + jobInfo.getJobId() ); 924 } 925 926 /** 927 * Test compatibility of JobHistoryParser with 0.23.9 history files 928 * @throws IOException 929 */ 930 @Test testTaskAttemptUnsuccessfulCompletionWithoutCounters0239()931 public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException 932 { 933 Path histPath = new Path(getClass().getClassLoader().getResource( 934 "job_0.23.9-FAILED.jhist").getFile()); 935 JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal 936 (new Configuration()), histPath); 937 JobInfo jobInfo = parser.parse(); 938 LOG.info(" job info: " + jobInfo.getJobname() + " " 939 + jobInfo.getFinishedMaps() + " " 940 + jobInfo.getTotalMaps() + " " 941 + jobInfo.getJobId() ) ; 942 } 943 }