1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2; 20 21 import java.io.File; 22 import java.io.IOException; 23 import java.util.EnumSet; 24 import java.util.List; 25 26 import org.junit.Assert; 27 28 import org.apache.avro.AvroRemoteException; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.mapreduce.SleepJob; 32 import org.apache.hadoop.conf.Configuration; 33 import org.apache.hadoop.fs.FileSystem; 34 import org.apache.hadoop.fs.Path; 35 import org.apache.hadoop.fs.permission.FsPermission; 36 import org.apache.hadoop.mapreduce.Counters; 37 import org.apache.hadoop.mapreduce.Job; 38 import org.apache.hadoop.mapreduce.TypeConverter; 39 import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; 40 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; 41 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; 42 import org.apache.hadoop.mapreduce.v2.api.records.JobId; 43 import org.apache.hadoop.mapreduce.v2.api.records.JobReport; 44 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 45 import org.apache.hadoop.net.NetUtils; 46 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 47 import org.apache.hadoop.yarn.api.records.ApplicationId; 48 import org.apache.hadoop.yarn.api.records.ContainerId; 49 import org.apache.hadoop.yarn.ipc.YarnRPC; 50 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; 51 import org.apache.hadoop.yarn.util.Records; 52 import org.junit.After; 53 import org.junit.Before; 54 import org.junit.Test; 55 56 public class TestMRJobsWithHistoryService { 57 58 private static final Log LOG = 59 LogFactory.getLog(TestMRJobsWithHistoryService.class); 60 61 private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES = 62 EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED); 63 64 private static MiniMRYarnCluster mrCluster; 65 66 private static Configuration conf = new Configuration(); 67 private static FileSystem localFs; 68 static { 69 try { 70 localFs = FileSystem.getLocal(conf); 71 } catch (IOException io) { 72 throw new RuntimeException("problem getting local fs", io); 73 } 74 } 75 76 private static Path TEST_ROOT_DIR = new Path("target", 77 TestMRJobs.class.getName() + "-tmpDir").makeQualified(localFs); 78 static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); 79 80 @Before setup()81 public void setup() throws InterruptedException, IOException { 82 83 if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { 84 LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR 85 + " not found. Not running test."); 86 return; 87 } 88 89 if (mrCluster == null) { 90 mrCluster = new MiniMRYarnCluster(getClass().getName()); 91 mrCluster.init(new Configuration()); 92 mrCluster.start(); 93 } 94 95 // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to 96 // workaround the absent public discache. 97 localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR); 98 localFs.setPermission(APP_JAR, new FsPermission("700")); 99 } 100 101 @After tearDown()102 public void tearDown() { 103 if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { 104 LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR 105 + " not found. Not running test."); 106 return; 107 } 108 109 if (mrCluster != null) { 110 mrCluster.stop(); 111 } 112 } 113 114 @Test (timeout = 90000) testJobHistoryData()115 public void testJobHistoryData() throws IOException, InterruptedException, 116 AvroRemoteException, ClassNotFoundException { 117 if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { 118 LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR 119 + " not found. Not running test."); 120 return; 121 } 122 123 124 125 SleepJob sleepJob = new SleepJob(); 126 sleepJob.setConf(mrCluster.getConfig()); 127 // Job with 3 maps and 2 reduces 128 Job job = sleepJob.createJob(3, 2, 1000, 1, 500, 1); 129 job.setJarByClass(SleepJob.class); 130 job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. 131 job.waitForCompletion(true); 132 Counters counterMR = job.getCounters(); 133 JobId jobId = TypeConverter.toYarn(job.getJobID()); 134 ApplicationId appID = jobId.getAppId(); 135 int pollElapsed = 0; 136 while (true) { 137 Thread.sleep(1000); 138 pollElapsed += 1000; 139 140 if (TERMINAL_RM_APP_STATES.contains( 141 mrCluster.getResourceManager().getRMContext().getRMApps().get(appID) 142 .getState())) { 143 break; 144 } 145 146 if (pollElapsed >= 60000) { 147 LOG.warn("application did not reach terminal state within 60 seconds"); 148 break; 149 } 150 } 151 Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager() 152 .getRMContext().getRMApps().get(appID).getState()); 153 Counters counterHS = job.getCounters(); 154 //TODO the Assert below worked. need to check 155 //Should we compare each field or convert to V2 counter and compare 156 LOG.info("CounterHS " + counterHS); 157 LOG.info("CounterMR " + counterMR); 158 Assert.assertEquals(counterHS, counterMR); 159 160 HSClientProtocol historyClient = instantiateHistoryProxy(); 161 GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class); 162 gjReq.setJobId(jobId); 163 JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport(); 164 verifyJobReport(jobReport, jobId); 165 } 166 verifyJobReport(JobReport jobReport, JobId jobId)167 private void verifyJobReport(JobReport jobReport, JobId jobId) { 168 List<AMInfo> amInfos = jobReport.getAMInfos(); 169 Assert.assertEquals(1, amInfos.size()); 170 AMInfo amInfo = amInfos.get(0); 171 ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(jobId.getAppId(), 1); 172 ContainerId amContainerId = ContainerId.newContainerId(appAttemptId, 1); 173 Assert.assertEquals(appAttemptId, amInfo.getAppAttemptId()); 174 Assert.assertEquals(amContainerId, amInfo.getContainerId()); 175 Assert.assertTrue(jobReport.getSubmitTime() > 0); 176 Assert.assertTrue(jobReport.getStartTime() > 0 177 && jobReport.getStartTime() >= jobReport.getSubmitTime()); 178 Assert.assertTrue(jobReport.getFinishTime() > 0 179 && jobReport.getFinishTime() >= jobReport.getStartTime()); 180 } 181 instantiateHistoryProxy()182 private HSClientProtocol instantiateHistoryProxy() { 183 final String serviceAddr = 184 mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS); 185 final YarnRPC rpc = YarnRPC.create(conf); 186 HSClientProtocol historyClient = 187 (HSClientProtocol) rpc.getProxy(HSClientProtocol.class, 188 NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig()); 189 return historyClient; 190 } 191 } 192