1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2;
20 
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.EnumSet;
24 import java.util.List;
25 
26 import org.junit.Assert;
27 
28 import org.apache.avro.AvroRemoteException;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.mapreduce.SleepJob;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.fs.permission.FsPermission;
36 import org.apache.hadoop.mapreduce.Counters;
37 import org.apache.hadoop.mapreduce.Job;
38 import org.apache.hadoop.mapreduce.TypeConverter;
39 import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
40 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
41 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
42 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
43 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
44 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
45 import org.apache.hadoop.net.NetUtils;
46 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
47 import org.apache.hadoop.yarn.api.records.ApplicationId;
48 import org.apache.hadoop.yarn.api.records.ContainerId;
49 import org.apache.hadoop.yarn.ipc.YarnRPC;
50 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
51 import org.apache.hadoop.yarn.util.Records;
52 import org.junit.After;
53 import org.junit.Before;
54 import org.junit.Test;
55 
56 public class TestMRJobsWithHistoryService {
57 
58   private static final Log LOG =
59     LogFactory.getLog(TestMRJobsWithHistoryService.class);
60 
61   private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
62     EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
63 
64   private static MiniMRYarnCluster mrCluster;
65 
66   private static Configuration conf = new Configuration();
67   private static FileSystem localFs;
68   static {
69     try {
70       localFs = FileSystem.getLocal(conf);
71     } catch (IOException io) {
72       throw new RuntimeException("problem getting local fs", io);
73     }
74   }
75 
76   private static Path TEST_ROOT_DIR = new Path("target",
77       TestMRJobs.class.getName() + "-tmpDir").makeQualified(localFs);
78   static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
79 
80   @Before
setup()81   public void setup() throws InterruptedException, IOException {
82 
83     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
84       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
85                + " not found. Not running test.");
86       return;
87     }
88 
89     if (mrCluster == null) {
90       mrCluster = new MiniMRYarnCluster(getClass().getName());
91       mrCluster.init(new Configuration());
92       mrCluster.start();
93     }
94 
95     // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to
96     // workaround the absent public discache.
97     localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
98     localFs.setPermission(APP_JAR, new FsPermission("700"));
99   }
100 
101   @After
tearDown()102   public void tearDown() {
103     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
104       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
105           + " not found. Not running test.");
106       return;
107     }
108 
109     if (mrCluster != null) {
110       mrCluster.stop();
111     }
112   }
113 
114   @Test (timeout = 90000)
testJobHistoryData()115   public void testJobHistoryData() throws IOException, InterruptedException,
116       AvroRemoteException, ClassNotFoundException {
117     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
118       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
119           + " not found. Not running test.");
120       return;
121     }
122 
123 
124 
125     SleepJob sleepJob = new SleepJob();
126     sleepJob.setConf(mrCluster.getConfig());
127     // Job with 3 maps and 2 reduces
128     Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1);
129     job.setJarByClass(SleepJob.class);
130     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
131     job.waitForCompletion(true);
132     Counters counterMR = job.getCounters();
133     JobId jobId = TypeConverter.toYarn(job.getJobID());
134     ApplicationId appID = jobId.getAppId();
135     int pollElapsed = 0;
136     while (true) {
137       Thread.sleep(1000);
138       pollElapsed += 1000;
139 
140       if (TERMINAL_RM_APP_STATES.contains(
141           mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
142           .getState())) {
143         break;
144       }
145 
146       if (pollElapsed >= 60000) {
147         LOG.warn("application did not reach terminal state within 60 seconds");
148         break;
149       }
150     }
151     Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
152       .getRMContext().getRMApps().get(appID).getState());
153     Counters counterHS = job.getCounters();
154     //TODO the Assert below worked. need to check
155     //Should we compare each field or convert to V2 counter and compare
156     LOG.info("CounterHS " + counterHS);
157     LOG.info("CounterMR " + counterMR);
158     Assert.assertEquals(counterHS, counterMR);
159 
160     HSClientProtocol historyClient = instantiateHistoryProxy();
161     GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class);
162     gjReq.setJobId(jobId);
163     JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport();
164     verifyJobReport(jobReport, jobId);
165   }
166 
verifyJobReport(JobReport jobReport, JobId jobId)167   private void verifyJobReport(JobReport jobReport, JobId jobId) {
168     List<AMInfo> amInfos = jobReport.getAMInfos();
169     Assert.assertEquals(1, amInfos.size());
170     AMInfo amInfo = amInfos.get(0);
171     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(jobId.getAppId(), 1);
172     ContainerId amContainerId = ContainerId.newContainerId(appAttemptId, 1);
173     Assert.assertEquals(appAttemptId, amInfo.getAppAttemptId());
174     Assert.assertEquals(amContainerId, amInfo.getContainerId());
175     Assert.assertTrue(jobReport.getSubmitTime() > 0);
176     Assert.assertTrue(jobReport.getStartTime() > 0
177         && jobReport.getStartTime() >= jobReport.getSubmitTime());
178     Assert.assertTrue(jobReport.getFinishTime() > 0
179         && jobReport.getFinishTime() >= jobReport.getStartTime());
180   }
181 
instantiateHistoryProxy()182   private HSClientProtocol instantiateHistoryProxy() {
183     final String serviceAddr =
184         mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS);
185     final YarnRPC rpc = YarnRPC.create(conf);
186     HSClientProtocol historyClient =
187         (HSClientProtocol) rpc.getProxy(HSClientProtocol.class,
188             NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig());
189     return historyClient;
190   }
191 }
192