1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce.v2.hs;
20 
21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
22     .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertNull;
26 import static org.junit.Assert.assertTrue;
27 
28 import java.io.ByteArrayOutputStream;
29 import java.io.IOException;
30 import java.io.PrintStream;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.StringTokenizer;
37 import java.util.concurrent.atomic.AtomicInteger;
38 
39 import org.junit.Assert;
40 
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FSDataInputStream;
45 import org.apache.hadoop.fs.FileContext;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.mapreduce.Counters;
49 import org.apache.hadoop.mapreduce.JobID;
50 import org.apache.hadoop.mapreduce.MRJobConfig;
51 import org.apache.hadoop.mapreduce.TaskID;
52 import org.apache.hadoop.mapreduce.TypeConverter;
53 import org.apache.hadoop.mapreduce.jobhistory.EventReader;
54 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
55 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
56 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
57 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
58 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
59 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
60 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
61 import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
62 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
63 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
64 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
65 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
66 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
67 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
68 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
69 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
70 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
71 import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
72 import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl;
73 import org.apache.hadoop.mapreduce.v2.app.MRApp;
74 import org.apache.hadoop.mapreduce.v2.app.job.Job;
75 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
76 import org.apache.hadoop.mapreduce.v2.app.job.Task;
77 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
78 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
79 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
80 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
81 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
82 import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
83 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
84 import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
85 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
86 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
87 import org.apache.hadoop.net.DNSToSwitchMapping;
88 import org.apache.hadoop.security.UserGroupInformation;
89 import org.apache.hadoop.service.Service;
90 import org.apache.hadoop.yarn.api.records.ContainerId;
91 import org.apache.hadoop.yarn.util.RackResolver;
92 import org.junit.Test;
93 import org.mockito.Mockito;
94 import org.mockito.invocation.InvocationOnMock;
95 import org.mockito.stubbing.Answer;
96 
97 public class TestJobHistoryParsing {
98   private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
99 
100   private static final String RACK_NAME = "/MyRackName";
101 
102   private ByteArrayOutputStream outContent = new ByteArrayOutputStream();
103 
104   public static class MyResolver implements DNSToSwitchMapping {
105     @Override
resolve(List<String> names)106     public List<String> resolve(List<String> names) {
107       return Arrays.asList(new String[] { RACK_NAME });
108     }
109 
110     @Override
reloadCachedMappings()111     public void reloadCachedMappings() {
112     }
113 
114     @Override
reloadCachedMappings(List<String> names)115     public void reloadCachedMappings(List<String> names) {
116     }
117   }
118 
119   @Test(timeout = 50000)
testJobInfo()120   public void testJobInfo() throws Exception {
121     JobInfo info = new JobInfo();
122     Assert.assertEquals("NORMAL", info.getPriority());
123     info.printAll();
124   }
125 
126   @Test(timeout = 300000)
testHistoryParsing()127   public void testHistoryParsing() throws Exception {
128     LOG.info("STARTING testHistoryParsing()");
129     try {
130       checkHistoryParsing(2, 1, 2);
131     } finally {
132       LOG.info("FINISHED testHistoryParsing()");
133     }
134   }
135 
136   @Test(timeout = 50000)
testHistoryParsingWithParseErrors()137   public void testHistoryParsingWithParseErrors() throws Exception {
138     LOG.info("STARTING testHistoryParsingWithParseErrors()");
139     try {
140       checkHistoryParsing(3, 0, 2);
141     } finally {
142       LOG.info("FINISHED testHistoryParsingWithParseErrors()");
143     }
144   }
145 
getJobSummary(FileContext fc, Path path)146   private static String getJobSummary(FileContext fc, Path path)
147       throws IOException {
148     Path qPath = fc.makeQualified(path);
149     FSDataInputStream in = fc.open(qPath);
150     String jobSummaryString = in.readUTF();
151     in.close();
152     return jobSummaryString;
153   }
154 
checkHistoryParsing(final int numMaps, final int numReduces, final int numSuccessfulMaps)155   private void checkHistoryParsing(final int numMaps, final int numReduces,
156       final int numSuccessfulMaps) throws Exception {
157     Configuration conf = new Configuration();
158     conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
159     long amStartTimeEst = System.currentTimeMillis();
160     conf.setClass(
161         NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
162         MyResolver.class, DNSToSwitchMapping.class);
163     RackResolver.init(conf);
164     MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass()
165         .getName(), true);
166     app.submit(conf);
167     Job job = app.getContext().getAllJobs().values().iterator().next();
168     JobId jobId = job.getID();
169     LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
170     app.waitForState(job, JobState.SUCCEEDED);
171 
172     // make sure all events are flushed
173     app.waitForState(Service.STATE.STOPPED);
174 
175     String jobhistoryDir = JobHistoryUtils
176         .getHistoryIntermediateDoneDirForUser(conf);
177 
178     FileContext fc = null;
179     try {
180       fc = FileContext.getFileContext(conf);
181     } catch (IOException ioe) {
182       LOG.info("Can not get FileContext", ioe);
183       throw (new Exception("Can not get File Context"));
184     }
185 
186     if (numMaps == numSuccessfulMaps) {
187       String summaryFileName = JobHistoryUtils
188           .getIntermediateSummaryFileName(jobId);
189       Path summaryFile = new Path(jobhistoryDir, summaryFileName);
190       String jobSummaryString = getJobSummary(fc, summaryFile);
191       Assert.assertNotNull(jobSummaryString);
192       Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
193       Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
194 
195       Map<String, String> jobSummaryElements = new HashMap<String, String>();
196       StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
197       while (strToken.hasMoreTokens()) {
198         String keypair = strToken.nextToken();
199         jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
200       }
201 
202       Assert.assertEquals("JobId does not match", jobId.toString(),
203           jobSummaryElements.get("jobId"));
204       Assert.assertEquals("JobName does not match", "test",
205           jobSummaryElements.get("jobName"));
206       Assert.assertTrue("submitTime should not be 0",
207           Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
208       Assert.assertTrue("launchTime should not be 0",
209           Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
210       Assert
211           .assertTrue(
212               "firstMapTaskLaunchTime should not be 0",
213               Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
214       Assert
215           .assertTrue("firstReduceTaskLaunchTime should not be 0",
216               Long.parseLong(jobSummaryElements
217                   .get("firstReduceTaskLaunchTime")) != 0);
218       Assert.assertTrue("finishTime should not be 0",
219           Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
220       Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
221           Integer.parseInt(jobSummaryElements.get("numMaps")));
222       Assert.assertEquals("Mismatch in num reduce slots", numReduces,
223           Integer.parseInt(jobSummaryElements.get("numReduces")));
224       Assert.assertEquals("User does not match",
225           System.getProperty("user.name"), jobSummaryElements.get("user"));
226       Assert.assertEquals("Queue does not match", "default",
227           jobSummaryElements.get("queue"));
228       Assert.assertEquals("Status does not match", "SUCCEEDED",
229           jobSummaryElements.get("status"));
230     }
231 
232     JobHistory jobHistory = new JobHistory();
233     jobHistory.init(conf);
234     HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
235     JobInfo jobInfo;
236     long numFinishedMaps;
237 
238     synchronized (fileInfo) {
239       Path historyFilePath = fileInfo.getHistoryFile();
240       FSDataInputStream in = null;
241       LOG.info("JobHistoryFile is: " + historyFilePath);
242       try {
243         in = fc.open(fc.makeQualified(historyFilePath));
244       } catch (IOException ioe) {
245         LOG.info("Can not open history file: " + historyFilePath, ioe);
246         throw (new Exception("Can not open History File"));
247       }
248 
249       JobHistoryParser parser = new JobHistoryParser(in);
250       final EventReader realReader = new EventReader(in);
251       EventReader reader = Mockito.mock(EventReader.class);
252       if (numMaps == numSuccessfulMaps) {
253         reader = realReader;
254       } else {
255         final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
256         Mockito.when(reader.getNextEvent()).thenAnswer(
257             new Answer<HistoryEvent>() {
258               public HistoryEvent answer(InvocationOnMock invocation)
259                   throws IOException {
260                 HistoryEvent event = realReader.getNextEvent();
261                 if (event instanceof TaskFinishedEvent) {
262                   numFinishedEvents.incrementAndGet();
263                 }
264 
265                 if (numFinishedEvents.get() <= numSuccessfulMaps) {
266                   return event;
267                 } else {
268                   throw new IOException("test");
269                 }
270               }
271             });
272       }
273 
274       jobInfo = parser.parse(reader);
275 
276       numFinishedMaps = computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
277 
278       if (numFinishedMaps != numMaps) {
279         Exception parseException = parser.getParseException();
280         Assert.assertNotNull("Didn't get expected parse exception",
281             parseException);
282       }
283     }
284 
285     Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
286         jobInfo.getUsername());
287     Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
288     Assert.assertEquals("Incorrect queuename ", "default",
289         jobInfo.getJobQueueName());
290     Assert
291         .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
292     Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps,
293         numFinishedMaps);
294     Assert.assertEquals("incorrect finishedReduces ", numReduces,
295         jobInfo.getFinishedReduces());
296     Assert.assertEquals("incorrect uberized ", job.isUber(),
297         jobInfo.getUberized());
298     Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
299     int totalTasks = allTasks.size();
300     Assert.assertEquals("total number of tasks is incorrect  ",
301         (numMaps + numReduces), totalTasks);
302 
303     // Verify aminfo
304     Assert.assertEquals(1, jobInfo.getAMInfos().size());
305     Assert.assertEquals(MRApp.NM_HOST, jobInfo.getAMInfos().get(0)
306         .getNodeManagerHost());
307     AMInfo amInfo = jobInfo.getAMInfos().get(0);
308     Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
309     Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
310     Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
311     Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
312         .getApplicationAttemptId());
313     Assert.assertTrue(amInfo.getStartTime() <= System.currentTimeMillis()
314         && amInfo.getStartTime() >= amStartTimeEst);
315 
316     ContainerId fakeCid = MRApp.newContainerId(-1, -1, -1, -1);
317     // Assert at taskAttempt level
318     for (TaskInfo taskInfo : allTasks.values()) {
319       int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
320       Assert
321           .assertEquals("total number of task attempts ", 1, taskAttemptCount);
322       TaskAttemptInfo taInfo = taskInfo.getAllTaskAttempts().values()
323           .iterator().next();
324       Assert.assertNotNull(taInfo.getContainerId());
325       // Verify the wrong ctor is not being used. Remove after mrv1 is removed.
326       Assert.assertFalse(taInfo.getContainerId().equals(fakeCid));
327     }
328 
329     // Deep compare Job and JobInfo
330     for (Task task : job.getTasks().values()) {
331       TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
332       Assert.assertNotNull("TaskInfo not found", taskInfo);
333       for (TaskAttempt taskAttempt : task.getAttempts().values()) {
334         TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
335             TypeConverter.fromYarn((taskAttempt.getID())));
336         Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
337         Assert.assertEquals("Incorrect shuffle port for task attempt",
338             taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
339         if (numMaps == numSuccessfulMaps) {
340           Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
341           Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
342 
343           // Verify rack-name
344           Assert.assertEquals("rack-name is incorrect",
345               taskAttemptInfo.getRackname(), RACK_NAME);
346         }
347       }
348     }
349 
350     // test output for HistoryViewer
351     PrintStream stdps = System.out;
352     try {
353       System.setOut(new PrintStream(outContent));
354       HistoryViewer viewer;
355       synchronized (fileInfo) {
356         viewer = new HistoryViewer(fc.makeQualified(
357             fileInfo.getHistoryFile()).toString(), conf, true);
358       }
359       viewer.print();
360 
361       for (TaskInfo taskInfo : allTasks.values()) {
362 
363         String test = (taskInfo.getTaskStatus() == null ? "" : taskInfo
364             .getTaskStatus())
365             + " "
366             + taskInfo.getTaskType()
367             + " task list for " + taskInfo.getTaskId().getJobID();
368         Assert.assertTrue(outContent.toString().indexOf(test) > 0);
369         Assert.assertTrue(outContent.toString().indexOf(
370             taskInfo.getTaskId().toString()) > 0);
371       }
372     } finally {
373       System.setOut(stdps);
374 
375     }
376   }
377 
378   // Computes finished maps similar to RecoveryService...
computeFinishedMaps(JobInfo jobInfo, int numMaps, int numSuccessfulMaps)379   private long computeFinishedMaps(JobInfo jobInfo, int numMaps,
380       int numSuccessfulMaps) {
381     if (numMaps == numSuccessfulMaps) {
382       return jobInfo.getFinishedMaps();
383     }
384 
385     long numFinishedMaps = 0;
386     Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
387         .getAllTasks();
388     for (TaskInfo taskInfo : taskInfos.values()) {
389       if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
390         ++numFinishedMaps;
391       }
392     }
393     return numFinishedMaps;
394   }
395 
396   @Test(timeout = 30000)
testHistoryParsingForFailedAttempts()397   public void testHistoryParsingForFailedAttempts() throws Exception {
398     LOG.info("STARTING testHistoryParsingForFailedAttempts");
399     try {
400       Configuration conf = new Configuration();
401       conf.setClass(
402           NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
403           MyResolver.class, DNSToSwitchMapping.class);
404       RackResolver.init(conf);
405       MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this
406           .getClass().getName(), true);
407       app.submit(conf);
408       Job job = app.getContext().getAllJobs().values().iterator().next();
409       JobId jobId = job.getID();
410       app.waitForState(job, JobState.SUCCEEDED);
411 
412       // make sure all events are flushed
413       app.waitForState(Service.STATE.STOPPED);
414 
415       JobHistory jobHistory = new JobHistory();
416       jobHistory.init(conf);
417       HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
418 
419       JobHistoryParser parser;
420       JobInfo jobInfo;
421       synchronized (fileInfo) {
422         Path historyFilePath = fileInfo.getHistoryFile();
423         FSDataInputStream in = null;
424         FileContext fc = null;
425         try {
426           fc = FileContext.getFileContext(conf);
427           in = fc.open(fc.makeQualified(historyFilePath));
428         } catch (IOException ioe) {
429           LOG.info("Can not open history file: " + historyFilePath, ioe);
430           throw (new Exception("Can not open History File"));
431         }
432 
433         parser = new JobHistoryParser(in);
434         jobInfo = parser.parse();
435       }
436       Exception parseException = parser.getParseException();
437       Assert.assertNull("Caught an expected exception " + parseException,
438           parseException);
439       int noOffailedAttempts = 0;
440       Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
441       for (Task task : job.getTasks().values()) {
442         TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
443         for (TaskAttempt taskAttempt : task.getAttempts().values()) {
444           TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
445               TypeConverter.fromYarn((taskAttempt.getID())));
446           // Verify rack-name for all task attempts
447           Assert.assertEquals("rack-name is incorrect",
448               taskAttemptInfo.getRackname(), RACK_NAME);
449           if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
450             noOffailedAttempts++;
451           }
452         }
453       }
454       Assert.assertEquals("No of Failed tasks doesn't match.", 2,
455           noOffailedAttempts);
456     } finally {
457       LOG.info("FINISHED testHistoryParsingForFailedAttempts");
458     }
459   }
460 
461   @Test(timeout = 60000)
testCountersForFailedTask()462   public void testCountersForFailedTask() throws Exception {
463     LOG.info("STARTING testCountersForFailedTask");
464     try {
465       Configuration conf = new Configuration();
466       conf.setClass(
467           NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
468           MyResolver.class, DNSToSwitchMapping.class);
469       RackResolver.init(conf);
470       MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this
471           .getClass().getName(), true);
472       app.submit(conf);
473       Job job = app.getContext().getAllJobs().values().iterator().next();
474       JobId jobId = job.getID();
475       app.waitForState(job, JobState.FAILED);
476 
477       // make sure all events are flushed
478       app.waitForState(Service.STATE.STOPPED);
479 
480       JobHistory jobHistory = new JobHistory();
481       jobHistory.init(conf);
482 
483       HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
484 
485       JobHistoryParser parser;
486       JobInfo jobInfo;
487       synchronized (fileInfo) {
488         Path historyFilePath = fileInfo.getHistoryFile();
489         FSDataInputStream in = null;
490         FileContext fc = null;
491         try {
492           fc = FileContext.getFileContext(conf);
493           in = fc.open(fc.makeQualified(historyFilePath));
494         } catch (IOException ioe) {
495           LOG.info("Can not open history file: " + historyFilePath, ioe);
496           throw (new Exception("Can not open History File"));
497         }
498 
499         parser = new JobHistoryParser(in);
500         jobInfo = parser.parse();
501       }
502       Exception parseException = parser.getParseException();
503       Assert.assertNull("Caught an expected exception " + parseException,
504           parseException);
505       for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
506         TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
507         CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
508         Assert.assertNotNull("completed task report has null counters", ct
509             .getReport().getCounters());
510       }
511       final List<String> originalDiagnostics = job.getDiagnostics();
512       final String historyError = jobInfo.getErrorInfo();
513       assertTrue("No original diagnostics for a failed job",
514           originalDiagnostics != null && !originalDiagnostics.isEmpty());
515       assertNotNull("No history error info for a failed job ", historyError);
516       for (String diagString : originalDiagnostics) {
517         assertTrue(historyError.contains(diagString));
518       }
519     } finally {
520       LOG.info("FINISHED testCountersForFailedTask");
521     }
522   }
523 
524   @Test(timeout = 60000)
testDiagnosticsForKilledJob()525   public void testDiagnosticsForKilledJob() throws Exception {
526     LOG.info("STARTING testDiagnosticsForKilledJob");
527     try {
528       final Configuration conf = new Configuration();
529       conf.setClass(
530           NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
531           MyResolver.class, DNSToSwitchMapping.class);
532       RackResolver.init(conf);
533       MRApp app = new MRAppWithHistoryWithJobKilled(2, 1, true, this
534           .getClass().getName(), true);
535       app.submit(conf);
536       Job job = app.getContext().getAllJobs().values().iterator().next();
537       JobId jobId = job.getID();
538       app.waitForState(job, JobState.KILLED);
539 
540       // make sure all events are flushed
541       app.waitForState(Service.STATE.STOPPED);
542 
543       JobHistory jobHistory = new JobHistory();
544       jobHistory.init(conf);
545 
546       HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
547 
548       JobHistoryParser parser;
549       JobInfo jobInfo;
550       synchronized (fileInfo) {
551         Path historyFilePath = fileInfo.getHistoryFile();
552         FSDataInputStream in = null;
553         FileContext fc = null;
554         try {
555           fc = FileContext.getFileContext(conf);
556           in = fc.open(fc.makeQualified(historyFilePath));
557         } catch (IOException ioe) {
558           LOG.info("Can not open history file: " + historyFilePath, ioe);
559           throw (new Exception("Can not open History File"));
560         }
561 
562         parser = new JobHistoryParser(in);
563         jobInfo = parser.parse();
564       }
565       Exception parseException = parser.getParseException();
566       assertNull("Caught an expected exception " + parseException,
567           parseException);
568       final List<String> originalDiagnostics = job.getDiagnostics();
569       final String historyError = jobInfo.getErrorInfo();
570       assertTrue("No original diagnostics for a failed job",
571           originalDiagnostics != null && !originalDiagnostics.isEmpty());
572       assertNotNull("No history error info for a failed job ", historyError);
573       for (String diagString : originalDiagnostics) {
574         assertTrue(historyError.contains(diagString));
575       }
576       assertTrue("No killed message in diagnostics",
577         historyError.contains(JobImpl.JOB_KILLED_DIAG));
578     } finally {
579       LOG.info("FINISHED testDiagnosticsForKilledJob");
580     }
581   }
582 
583   @Test(timeout = 50000)
testScanningOldDirs()584   public void testScanningOldDirs() throws Exception {
585     LOG.info("STARTING testScanningOldDirs");
586     try {
587       Configuration conf = new Configuration();
588       conf.setClass(
589           NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
590           MyResolver.class, DNSToSwitchMapping.class);
591       RackResolver.init(conf);
592       MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
593           true);
594       app.submit(conf);
595       Job job = app.getContext().getAllJobs().values().iterator().next();
596       JobId jobId = job.getID();
597       LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
598       app.waitForState(job, JobState.SUCCEEDED);
599 
600       // make sure all events are flushed
601       app.waitForState(Service.STATE.STOPPED);
602 
603       HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
604       hfm.init(conf);
605       HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
606       Assert.assertNotNull("Unable to locate job history", fileInfo);
607 
608       // force the manager to "forget" the job
609       hfm.deleteJobFromJobListCache(fileInfo);
610       final int msecPerSleep = 10;
611       int msecToSleep = 10 * 1000;
612       while (fileInfo.isMovePending() && msecToSleep > 0) {
613         Assert.assertTrue(!fileInfo.didMoveFail());
614         msecToSleep -= msecPerSleep;
615         Thread.sleep(msecPerSleep);
616       }
617       Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
618 
619       fileInfo = hfm.getFileInfo(jobId);
620       hfm.stop();
621       Assert.assertNotNull("Unable to locate old job history", fileInfo);
622       Assert.assertTrue("HistoryFileManager not shutdown properly",
623           hfm.moveToDoneExecutor.isTerminated());
624     } finally {
625       LOG.info("FINISHED testScanningOldDirs");
626     }
627   }
628 
629   static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
630 
MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart)631     public MRAppWithHistoryWithFailedAttempt(int maps, int reduces,
632         boolean autoComplete, String testName, boolean cleanOnStart) {
633       super(maps, reduces, autoComplete, testName, cleanOnStart);
634     }
635 
636     @SuppressWarnings("unchecked")
637     @Override
attemptLaunched(TaskAttemptId attemptID)638     protected void attemptLaunched(TaskAttemptId attemptID) {
639       if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
640         getContext().getEventHandler().handle(
641             new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
642       } else {
643         getContext().getEventHandler().handle(
644             new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
645       }
646     }
647   }
648 
649   static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
650 
MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart)651     public MRAppWithHistoryWithFailedTask(int maps, int reduces,
652         boolean autoComplete, String testName, boolean cleanOnStart) {
653       super(maps, reduces, autoComplete, testName, cleanOnStart);
654     }
655 
656     @SuppressWarnings("unchecked")
657     @Override
attemptLaunched(TaskAttemptId attemptID)658     protected void attemptLaunched(TaskAttemptId attemptID) {
659       if (attemptID.getTaskId().getId() == 0) {
660         getContext().getEventHandler().handle(
661             new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
662       } else {
663         getContext().getEventHandler().handle(
664             new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
665       }
666     }
667   }
668 
669   static class MRAppWithHistoryWithJobKilled extends MRAppWithHistory {
670 
MRAppWithHistoryWithJobKilled(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart)671     public MRAppWithHistoryWithJobKilled(int maps, int reduces,
672         boolean autoComplete, String testName, boolean cleanOnStart) {
673       super(maps, reduces, autoComplete, testName, cleanOnStart);
674     }
675 
676     @SuppressWarnings("unchecked")
677     @Override
attemptLaunched(TaskAttemptId attemptID)678     protected void attemptLaunched(TaskAttemptId attemptID) {
679       if (attemptID.getTaskId().getId() == 0) {
680         getContext().getEventHandler().handle(
681             new JobEvent(attemptID.getTaskId().getJobId(),
682                 JobEventType.JOB_KILL));
683       } else {
684         getContext().getEventHandler().handle(
685             new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
686       }
687     }
688   }
689 
690   static class HistoryFileManagerForTest extends HistoryFileManager {
deleteJobFromJobListCache(HistoryFileInfo fileInfo)691     void deleteJobFromJobListCache(HistoryFileInfo fileInfo) {
692       jobListCache.delete(fileInfo);
693     }
694   }
695 
main(String[] args)696   public static void main(String[] args) throws Exception {
697     TestJobHistoryParsing t = new TestJobHistoryParsing();
698     t.testHistoryParsing();
699     t.testHistoryParsingForFailedAttempts();
700   }
701 
702   /**
703    * test clean old history files. Files should be deleted after 1 week by
704    * default.
705    */
706   @Test(timeout = 15000)
testDeleteFileInfo()707   public void testDeleteFileInfo() throws Exception {
708     LOG.info("STARTING testDeleteFileInfo");
709     try {
710       Configuration conf = new Configuration();
711       conf.setClass(
712           NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
713           MyResolver.class, DNSToSwitchMapping.class);
714 
715       RackResolver.init(conf);
716       MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
717           true);
718       app.submit(conf);
719       Job job = app.getContext().getAllJobs().values().iterator().next();
720       JobId jobId = job.getID();
721 
722       app.waitForState(job, JobState.SUCCEEDED);
723 
724       // make sure all events are flushed
725       app.waitForState(Service.STATE.STOPPED);
726       HistoryFileManager hfm = new HistoryFileManager();
727       hfm.init(conf);
728       HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
729       hfm.initExisting();
730       // wait for move files form the done_intermediate directory to the gone
731       // directory
732       while (fileInfo.isMovePending()) {
733         Thread.sleep(300);
734       }
735 
736       Assert.assertNotNull(hfm.jobListCache.values());
737 
738       // try to remove fileInfo
739       hfm.clean();
740       // check that fileInfo does not deleted
741       Assert.assertFalse(fileInfo.isDeleted());
742       // correct live time
743       hfm.setMaxHistoryAge(-1);
744       hfm.clean();
745       hfm.stop();
746       Assert.assertTrue("Thread pool shutdown",
747           hfm.moveToDoneExecutor.isTerminated());
748       // should be deleted !
749       Assert.assertTrue("file should be deleted ", fileInfo.isDeleted());
750 
751     } finally {
752       LOG.info("FINISHED testDeleteFileInfo");
753     }
754   }
755 
756   /**
757    * Simple test some methods of JobHistory
758    */
759   @Test(timeout = 20000)
testJobHistoryMethods()760   public void testJobHistoryMethods() throws Exception {
761     LOG.info("STARTING testJobHistoryMethods");
762     try {
763       Configuration configuration = new Configuration();
764       configuration
765           .setClass(
766               NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
767               MyResolver.class, DNSToSwitchMapping.class);
768 
769       RackResolver.init(configuration);
770       MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
771           true);
772       app.submit(configuration);
773       Job job = app.getContext().getAllJobs().values().iterator().next();
774       app.waitForState(job, JobState.SUCCEEDED);
775 
776       JobHistory jobHistory = new JobHistory();
777       jobHistory.init(configuration);
778       // Method getAllJobs
779       Assert.assertEquals(1, jobHistory.getAllJobs().size());
780       // and with ApplicationId
781       Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size());
782 
783       JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default",
784           0L, System.currentTimeMillis() + 1, 0L,
785           System.currentTimeMillis() + 1, JobState.SUCCEEDED);
786 
787       Assert.assertEquals(1, jobsinfo.getJobs().size());
788       Assert.assertNotNull(jobHistory.getApplicationAttemptId());
789       // test Application Id
790       Assert.assertEquals("application_0_0000", jobHistory.getApplicationID()
791           .toString());
792       Assert
793           .assertEquals("Job History Server", jobHistory.getApplicationName());
794       // method does not work
795       Assert.assertNull(jobHistory.getEventHandler());
796       // method does not work
797       Assert.assertNull(jobHistory.getClock());
798       // method does not work
799       Assert.assertNull(jobHistory.getClusterInfo());
800 
801     } finally {
802       LOG.info("FINISHED testJobHistoryMethods");
803     }
804   }
805 
806   /**
807    * Simple test PartialJob
808    */
809   @Test(timeout = 3000)
testPartialJob()810   public void testPartialJob() throws Exception {
811     JobId jobId = new JobIdPBImpl();
812     jobId.setId(0);
813     JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user",
814         "jobName", jobId, 3, 2, "JobStatus");
815     PartialJob test = new PartialJob(jii, jobId);
816     assertEquals(1.0f, test.getProgress(), 0.001);
817     assertNull(test.getAllCounters());
818     assertNull(test.getTasks());
819     assertNull(test.getTasks(TaskType.MAP));
820     assertNull(test.getTask(new TaskIdPBImpl()));
821 
822     assertNull(test.getTaskAttemptCompletionEvents(0, 100));
823     assertNull(test.getMapAttemptCompletionEvents(0, 100));
824     assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null));
825     assertNull(test.getAMInfos());
826 
827   }
828 
829   @Test
testMultipleFailedTasks()830   public void testMultipleFailedTasks() throws Exception {
831     JobHistoryParser parser =
832         new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
833     EventReader reader = Mockito.mock(EventReader.class);
834     final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
835     final org.apache.hadoop.mapreduce.TaskType taskType =
836         org.apache.hadoop.mapreduce.TaskType.MAP;
837     final TaskID[] tids = new TaskID[2];
838     final JobID jid = new JobID("1", 1);
839     tids[0] = new TaskID(jid, taskType, 0);
840     tids[1] = new TaskID(jid, taskType, 1);
841     Mockito.when(reader.getNextEvent()).thenAnswer(
842         new Answer<HistoryEvent>() {
843           public HistoryEvent answer(InvocationOnMock invocation)
844               throws IOException {
845             // send two task start and two task fail events for tasks 0 and 1
846             int eventId = numEventsRead.getAndIncrement();
847             TaskID tid = tids[eventId & 0x1];
848             if (eventId < 2) {
849               return new TaskStartedEvent(tid, 0, taskType, "");
850             }
851             if (eventId < 4) {
852               TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
853                   "failed", "FAILED", null, new Counters());
854               tfe.setDatum(tfe.getDatum());
855               return tfe;
856             }
857             if (eventId < 5) {
858               JobUnsuccessfulCompletionEvent juce =
859                   new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
860                       "JOB_FAILED", Collections.singletonList(
861                           "Task failed: " + tids[0].toString()));
862               return juce;
863             }
864             return null;
865           }
866         });
867     JobInfo info = parser.parse(reader);
868     assertTrue("Task 0 not implicated",
869         info.getErrorInfo().contains(tids[0].toString()));
870   }
871 
872   @Test
testFailedJobHistoryWithoutDiagnostics()873   public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
874     final Path histPath = new Path(getClass().getClassLoader().getResource(
875         "job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist")
876         .getFile());
877     final FileSystem lfs = FileSystem.getLocal(new Configuration());
878     final FSDataInputStream fsdis = lfs.open(histPath);
879     try {
880       JobHistoryParser parser = new JobHistoryParser(fsdis);
881       JobInfo info = parser.parse();
882       assertEquals("History parsed jobId incorrectly",
883           info.getJobId(), JobID.forName("job_1393307629410_0001") );
884       assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo());
885     } finally {
886       fsdis.close();
887     }
888   }
889 
890   /**
891    * Test compatibility of JobHistoryParser with 2.0.3-alpha history files
892    * @throws IOException
893    */
894   @Test
testTaskAttemptUnsuccessfulCompletionWithoutCounters203()895   public void testTaskAttemptUnsuccessfulCompletionWithoutCounters203() throws IOException
896     {
897       Path histPath = new Path(getClass().getClassLoader().getResource(
898         "job_2.0.3-alpha-FAILED.jhist").getFile());
899       JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
900           (new Configuration()), histPath);
901       JobInfo jobInfo = parser.parse();
902       LOG.info(" job info: " + jobInfo.getJobname() + " "
903         + jobInfo.getFinishedMaps() + " "
904         + jobInfo.getTotalMaps() + " "
905         + jobInfo.getJobId() ) ;
906     }
907 
908   /**
909    * Test compatibility of JobHistoryParser with 2.4.0 history files
910    * @throws IOException
911    */
912   @Test
testTaskAttemptUnsuccessfulCompletionWithoutCounters240()913   public void testTaskAttemptUnsuccessfulCompletionWithoutCounters240() throws IOException
914     {
915       Path histPath = new Path(getClass().getClassLoader().getResource(
916         "job_2.4.0-FAILED.jhist").getFile());
917       JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
918           (new Configuration()), histPath);
919       JobInfo jobInfo = parser.parse();
920       LOG.info(" job info: " + jobInfo.getJobname() + " "
921         + jobInfo.getFinishedMaps() + " "
922         + jobInfo.getTotalMaps() + " "
923         + jobInfo.getJobId() );
924     }
925 
926   /**
927    * Test compatibility of JobHistoryParser with 0.23.9 history files
928    * @throws IOException
929    */
930   @Test
testTaskAttemptUnsuccessfulCompletionWithoutCounters0239()931   public void testTaskAttemptUnsuccessfulCompletionWithoutCounters0239() throws IOException
932     {
933       Path histPath = new Path(getClass().getClassLoader().getResource(
934           "job_0.23.9-FAILED.jhist").getFile());
935       JobHistoryParser parser = new JobHistoryParser(FileSystem.getLocal
936           (new Configuration()), histPath);
937       JobInfo jobInfo = parser.parse();
938       LOG.info(" job info: " + jobInfo.getJobname() + " "
939         + jobInfo.getFinishedMaps() + " "
940         + jobInfo.getTotalMaps() + " "
941         + jobInfo.getJobId() ) ;
942       }
943 }