1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import static org.mockito.Matchers.any;
22 import static org.mockito.Mockito.doAnswer;
23 import static org.mockito.Mockito.doReturn;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.when;
29 
30 import java.io.ByteArrayOutputStream;
31 import java.io.File;
32 import java.io.FileOutputStream;
33 import java.io.IOException;
34 import java.io.OutputStream;
35 import java.net.InetSocketAddress;
36 import java.nio.ByteBuffer;
37 import java.security.PrivilegedExceptionAction;
38 import java.util.List;
39 import java.util.Map;
40 
41 import junit.framework.TestCase;
42 
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.fs.CommonConfigurationKeys;
47 import org.apache.hadoop.fs.FileContext;
48 import org.apache.hadoop.fs.FileUtil;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.io.Text;
51 import org.apache.hadoop.mapreduce.JobID;
52 import org.apache.hadoop.mapreduce.JobPriority;
53 import org.apache.hadoop.mapreduce.JobStatus.State;
54 import org.apache.hadoop.mapreduce.MRConfig;
55 import org.apache.hadoop.mapreduce.MRJobConfig;
56 import org.apache.hadoop.mapreduce.TypeConverter;
57 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
58 import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
59 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
60 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
61 import org.apache.hadoop.mapreduce.v2.util.MRApps;
62 import org.apache.hadoop.security.Credentials;
63 import org.apache.hadoop.security.SecurityUtil;
64 import org.apache.hadoop.security.UserGroupInformation;
65 import org.apache.hadoop.security.token.Token;
66 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
67 import org.apache.hadoop.yarn.api.ApplicationConstants;
68 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
69 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
70 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
71 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
72 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
73 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
74 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
75 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
76 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
77 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
78 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
79 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
80 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
81 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
82 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
83 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
84 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
85 import org.apache.hadoop.yarn.api.records.ApplicationId;
86 import org.apache.hadoop.yarn.api.records.ApplicationReport;
87 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
88 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
89 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
90 import org.apache.hadoop.yarn.api.records.QueueInfo;
91 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
92 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
93 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
94 import org.apache.hadoop.yarn.conf.YarnConfiguration;
95 import org.apache.hadoop.yarn.factories.RecordFactory;
96 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
97 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
98 import org.apache.hadoop.yarn.util.Records;
99 import org.apache.log4j.Appender;
100 import org.apache.log4j.Layout;
101 import org.apache.log4j.Logger;
102 import org.apache.log4j.SimpleLayout;
103 import org.apache.log4j.WriterAppender;
104 import org.junit.After;
105 import org.junit.Before;
106 import org.junit.Test;
107 import org.mockito.invocation.InvocationOnMock;
108 import org.mockito.stubbing.Answer;
109 
110 /**
111  * Test YarnRunner and make sure the client side plugin works
112  * fine
113  */
114 public class TestYARNRunner extends TestCase {
115   private static final Log LOG = LogFactory.getLog(TestYARNRunner.class);
116   private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
117 
118   // prefix before <LOG_DIR>/profile.out
119   private static final String PROFILE_PARAMS =
120       MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0,
121           MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%"));
122 
123   private YARNRunner yarnRunner;
124   private ResourceMgrDelegate resourceMgrDelegate;
125   private YarnConfiguration conf;
126   private ClientCache clientCache;
127   private ApplicationId appId;
128   private JobID jobId;
129   private File testWorkDir =
130       new File("target", TestYARNRunner.class.getName());
131   private ApplicationSubmissionContext submissionContext;
132   private  ClientServiceDelegate clientDelegate;
133   private static final String failString = "Rejected job";
134 
135   @Before
setUp()136   public void setUp() throws Exception {
137     resourceMgrDelegate = mock(ResourceMgrDelegate.class);
138     conf = new YarnConfiguration();
139     conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
140     clientCache = new ClientCache(conf, resourceMgrDelegate);
141     clientCache = spy(clientCache);
142     yarnRunner = new YARNRunner(conf, resourceMgrDelegate, clientCache);
143     yarnRunner = spy(yarnRunner);
144     submissionContext = mock(ApplicationSubmissionContext.class);
145     doAnswer(
146         new Answer<ApplicationSubmissionContext>() {
147           @Override
148           public ApplicationSubmissionContext answer(InvocationOnMock invocation)
149               throws Throwable {
150             return submissionContext;
151           }
152         }
153         ).when(yarnRunner).createApplicationSubmissionContext(any(Configuration.class),
154             any(String.class), any(Credentials.class));
155 
156     appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
157     jobId = TypeConverter.fromYarn(appId);
158     if (testWorkDir.exists()) {
159       FileContext.getLocalFSFileContext().delete(new Path(testWorkDir.toString()), true);
160     }
161     testWorkDir.mkdirs();
162   }
163 
164   @After
cleanup()165   public void cleanup() {
166     FileUtil.fullyDelete(testWorkDir);
167   }
168 
169   @Test(timeout=20000)
testJobKill()170   public void testJobKill() throws Exception {
171     clientDelegate = mock(ClientServiceDelegate.class);
172     when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
173         org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
174             State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
175     when(clientDelegate.killJob(any(JobID.class))).thenReturn(true);
176     doAnswer(
177         new Answer<ClientServiceDelegate>() {
178           @Override
179           public ClientServiceDelegate answer(InvocationOnMock invocation)
180               throws Throwable {
181             return clientDelegate;
182           }
183         }
184         ).when(clientCache).getClient(any(JobID.class));
185     yarnRunner.killJob(jobId);
186     verify(resourceMgrDelegate).killApplication(appId);
187     when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
188         org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
189             State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
190     yarnRunner.killJob(jobId);
191     verify(clientDelegate).killJob(jobId);
192 
193     when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null);
194     when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class)))
195         .thenReturn(
196             ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp",
197                 "tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp",
198                 0l, 0l, FinalApplicationStatus.SUCCEEDED, null, null, 0f,
199                 "tmp", null));
200     yarnRunner.killJob(jobId);
201     verify(clientDelegate).killJob(jobId);
202   }
203 
204   @Test(timeout=60000)
testJobKillTimeout()205   public void testJobKillTimeout() throws Exception {
206     long timeToWaitBeforeHardKill =
207         10000 + MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS;
208     conf.setLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS,
209         timeToWaitBeforeHardKill);
210     clientDelegate = mock(ClientServiceDelegate.class);
211     doAnswer(
212         new Answer<ClientServiceDelegate>() {
213           @Override
214           public ClientServiceDelegate answer(InvocationOnMock invocation)
215               throws Throwable {
216             return clientDelegate;
217           }
218         }
219       ).when(clientCache).getClient(any(JobID.class));
220     when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
221         org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
222             State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
223     long startTimeMillis = System.currentTimeMillis();
224     yarnRunner.killJob(jobId);
225     assertTrue("killJob should have waited at least " + timeToWaitBeforeHardKill
226         + " ms.", System.currentTimeMillis() - startTimeMillis
227                   >= timeToWaitBeforeHardKill);
228   }
229 
230   @Test(timeout=20000)
testJobSubmissionFailure()231   public void testJobSubmissionFailure() throws Exception {
232     when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
233     thenReturn(appId);
234     ApplicationReport report = mock(ApplicationReport.class);
235     when(report.getApplicationId()).thenReturn(appId);
236     when(report.getDiagnostics()).thenReturn(failString);
237     when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FAILED);
238     when(resourceMgrDelegate.getApplicationReport(appId)).thenReturn(report);
239     Credentials credentials = new Credentials();
240     File jobxml = new File(testWorkDir, "job.xml");
241     OutputStream out = new FileOutputStream(jobxml);
242     conf.writeXml(out);
243     out.close();
244     try {
245       yarnRunner.submitJob(jobId, testWorkDir.getAbsolutePath().toString(), credentials);
246     } catch(IOException io) {
247       LOG.info("Logging exception:", io);
248       assertTrue(io.getLocalizedMessage().contains(failString));
249     }
250   }
251 
252   @Test(timeout=20000)
testResourceMgrDelegate()253   public void testResourceMgrDelegate() throws Exception {
254     /* we not want a mock of resource mgr delegate */
255     final ApplicationClientProtocol clientRMProtocol = mock(ApplicationClientProtocol.class);
256     ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) {
257       @Override
258       protected void serviceStart() throws Exception {
259         assertTrue(this.client instanceof YarnClientImpl);
260         ((YarnClientImpl) this.client).setRMClient(clientRMProtocol);
261       }
262     };
263     /* make sure kill calls finish application master */
264     when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
265     .thenReturn(KillApplicationResponse.newInstance(true));
266     delegate.killApplication(appId);
267     verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));
268 
269     /* make sure getalljobs calls get all applications */
270     when(clientRMProtocol.getApplications(any(GetApplicationsRequest.class))).
271     thenReturn(recordFactory.newRecordInstance(GetApplicationsResponse.class));
272     delegate.getAllJobs();
273     verify(clientRMProtocol).getApplications(any(GetApplicationsRequest.class));
274 
275     /* make sure getapplication report is called */
276     when(clientRMProtocol.getApplicationReport(any(GetApplicationReportRequest.class)))
277     .thenReturn(recordFactory.newRecordInstance(GetApplicationReportResponse.class));
278     delegate.getApplicationReport(appId);
279     verify(clientRMProtocol).getApplicationReport(any(GetApplicationReportRequest.class));
280 
281     /* make sure metrics is called */
282     GetClusterMetricsResponse clusterMetricsResponse = recordFactory.newRecordInstance
283         (GetClusterMetricsResponse.class);
284     clusterMetricsResponse.setClusterMetrics(recordFactory.newRecordInstance(
285         YarnClusterMetrics.class));
286     when(clientRMProtocol.getClusterMetrics(any(GetClusterMetricsRequest.class)))
287     .thenReturn(clusterMetricsResponse);
288     delegate.getClusterMetrics();
289     verify(clientRMProtocol).getClusterMetrics(any(GetClusterMetricsRequest.class));
290 
291     when(clientRMProtocol.getClusterNodes(any(GetClusterNodesRequest.class))).
292     thenReturn(recordFactory.newRecordInstance(GetClusterNodesResponse.class));
293     delegate.getActiveTrackers();
294     verify(clientRMProtocol).getClusterNodes(any(GetClusterNodesRequest.class));
295 
296     GetNewApplicationResponse newAppResponse = recordFactory.newRecordInstance(
297         GetNewApplicationResponse.class);
298     newAppResponse.setApplicationId(appId);
299     when(clientRMProtocol.getNewApplication(any(GetNewApplicationRequest.class))).
300     thenReturn(newAppResponse);
301     delegate.getNewJobID();
302     verify(clientRMProtocol).getNewApplication(any(GetNewApplicationRequest.class));
303 
304     GetQueueInfoResponse queueInfoResponse = recordFactory.newRecordInstance(
305         GetQueueInfoResponse.class);
306     queueInfoResponse.setQueueInfo(recordFactory.newRecordInstance(QueueInfo.class));
307     when(clientRMProtocol.getQueueInfo(any(GetQueueInfoRequest.class))).
308     thenReturn(queueInfoResponse);
309     delegate.getQueues();
310     verify(clientRMProtocol).getQueueInfo(any(GetQueueInfoRequest.class));
311 
312     GetQueueUserAclsInfoResponse aclResponse = recordFactory.newRecordInstance(
313         GetQueueUserAclsInfoResponse.class);
314     when(clientRMProtocol.getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)))
315     .thenReturn(aclResponse);
316     delegate.getQueueAclsForCurrentUser();
317     verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
318   }
319 
320   @Test(timeout=20000)
testGetHSDelegationToken()321   public void testGetHSDelegationToken() throws Exception {
322     try {
323       Configuration conf = new Configuration();
324 
325       // Setup mock service
326       InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444);
327       Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress);
328 
329       InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200);
330       Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress);
331 
332       // Setup mock rm token
333       RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier(
334           new Text("owner"), new Text("renewer"), new Text("real"));
335       Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
336           new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice);
337       token.setKind(RMDelegationTokenIdentifier.KIND_NAME);
338 
339       // Setup mock history token
340       org.apache.hadoop.yarn.api.records.Token historyToken =
341           org.apache.hadoop.yarn.api.records.Token.newInstance(new byte[0],
342             MRDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0],
343             hsTokenSevice.toString());
344       GetDelegationTokenResponse getDtResponse =
345           Records.newRecord(GetDelegationTokenResponse.class);
346       getDtResponse.setDelegationToken(historyToken);
347 
348       // mock services
349       MRClientProtocol mockHsProxy = mock(MRClientProtocol.class);
350       doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress();
351       doReturn(getDtResponse).when(mockHsProxy).getDelegationToken(
352           any(GetDelegationTokenRequest.class));
353 
354       ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
355       doReturn(rmTokenSevice).when(rmDelegate).getRMDelegationTokenService();
356 
357       ClientCache clientCache = mock(ClientCache.class);
358       doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy();
359 
360       Credentials creds = new Credentials();
361 
362       YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache);
363 
364       // No HS token if no RM token
365       yarnRunner.addHistoryToken(creds);
366       verify(mockHsProxy, times(0)).getDelegationToken(
367           any(GetDelegationTokenRequest.class));
368 
369       // No HS token if RM token, but secirity disabled.
370       creds.addToken(new Text("rmdt"), token);
371       yarnRunner.addHistoryToken(creds);
372       verify(mockHsProxy, times(0)).getDelegationToken(
373           any(GetDelegationTokenRequest.class));
374 
375       conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
376           "kerberos");
377       UserGroupInformation.setConfiguration(conf);
378       creds = new Credentials();
379 
380       // No HS token if no RM token, security enabled
381       yarnRunner.addHistoryToken(creds);
382       verify(mockHsProxy, times(0)).getDelegationToken(
383           any(GetDelegationTokenRequest.class));
384 
385       // HS token if RM token present, security enabled
386       creds.addToken(new Text("rmdt"), token);
387       yarnRunner.addHistoryToken(creds);
388       verify(mockHsProxy, times(1)).getDelegationToken(
389           any(GetDelegationTokenRequest.class));
390 
391       // No additional call to get HS token if RM and HS token present
392       yarnRunner.addHistoryToken(creds);
393       verify(mockHsProxy, times(1)).getDelegationToken(
394           any(GetDelegationTokenRequest.class));
395     } finally {
396       // Back to defaults.
397       UserGroupInformation.setConfiguration(new Configuration());
398     }
399   }
400 
401   @Test(timeout=20000)
testHistoryServerToken()402   public void testHistoryServerToken() throws Exception {
403     //Set the master principal in the config
404     conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL");
405 
406     final String masterPrincipal = Master.getMasterPrincipal(conf);
407 
408     final MRClientProtocol hsProxy = mock(MRClientProtocol.class);
409     when(hsProxy.getDelegationToken(any(GetDelegationTokenRequest.class))).thenAnswer(
410         new Answer<GetDelegationTokenResponse>() {
411           public GetDelegationTokenResponse answer(InvocationOnMock invocation) {
412             GetDelegationTokenRequest request =
413                 (GetDelegationTokenRequest)invocation.getArguments()[0];
414             // check that the renewer matches the cluster's RM principal
415             assertEquals(masterPrincipal, request.getRenewer() );
416 
417             org.apache.hadoop.yarn.api.records.Token token =
418                 recordFactory.newRecordInstance(org.apache.hadoop.yarn.api.records.Token.class);
419             // none of these fields matter for the sake of the test
420             token.setKind("");
421             token.setService("");
422             token.setIdentifier(ByteBuffer.allocate(0));
423             token.setPassword(ByteBuffer.allocate(0));
424             GetDelegationTokenResponse tokenResponse =
425                 recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
426             tokenResponse.setDelegationToken(token);
427             return tokenResponse;
428           }
429         });
430 
431     UserGroupInformation.createRemoteUser("someone").doAs(
432         new PrivilegedExceptionAction<Void>() {
433           @Override
434           public Void run() throws Exception {
435             yarnRunner = new YARNRunner(conf, null, null);
436             yarnRunner.getDelegationTokenFromHS(hsProxy);
437             verify(hsProxy).
438               getDelegationToken(any(GetDelegationTokenRequest.class));
439             return null;
440           }
441         });
442   }
443 
444   @Test(timeout=20000)
testAMAdminCommandOpts()445   public void testAMAdminCommandOpts() throws Exception {
446     JobConf jobConf = new JobConf();
447 
448     jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true");
449     jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m");
450 
451     YARNRunner yarnRunner = new YARNRunner(jobConf);
452 
453     ApplicationSubmissionContext submissionContext =
454         buildSubmitContext(yarnRunner, jobConf);
455 
456     ContainerLaunchContext containerSpec = submissionContext.getAMContainerSpec();
457     List<String> commands = containerSpec.getCommands();
458 
459     int index = 0;
460     int adminIndex = 0;
461     int adminPos = -1;
462     int userIndex = 0;
463     int userPos = -1;
464     int tmpDirPos = -1;
465 
466     for(String command : commands) {
467       if(command != null) {
468         assertFalse("Profiler should be disabled by default",
469             command.contains(PROFILE_PARAMS));
470         adminPos = command.indexOf("-Djava.net.preferIPv4Stack=true");
471         if(adminPos >= 0)
472           adminIndex = index;
473 
474         userPos = command.indexOf("-Xmx1024m");
475         if(userPos >= 0)
476           userIndex = index;
477 
478         tmpDirPos = command.indexOf("-Djava.io.tmpdir=");
479       }
480 
481       index++;
482     }
483 
484     // Check java.io.tmpdir opts are set in the commands
485     assertTrue("java.io.tmpdir is not set for AM", tmpDirPos > 0);
486 
487     // Check both admin java opts and user java opts are in the commands
488     assertTrue("AM admin command opts not in the commands.", adminPos > 0);
489     assertTrue("AM user command opts not in the commands.", userPos > 0);
490 
491     // Check the admin java opts is before user java opts in the commands
492     if(adminIndex == userIndex) {
493       assertTrue("AM admin command opts is after user command opts.", adminPos < userPos);
494     } else {
495       assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
496     }
497   }
498   @Test(timeout=20000)
testWarnCommandOpts()499   public void testWarnCommandOpts() throws Exception {
500     Logger logger = Logger.getLogger(YARNRunner.class);
501 
502     ByteArrayOutputStream bout = new ByteArrayOutputStream();
503     Layout layout = new SimpleLayout();
504     Appender appender = new WriterAppender(layout, bout);
505     logger.addAppender(appender);
506 
507     JobConf jobConf = new JobConf();
508 
509     jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true -Djava.library.path=foo");
510     jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m -Djava.library.path=bar");
511 
512     YARNRunner yarnRunner = new YARNRunner(jobConf);
513 
514     @SuppressWarnings("unused")
515     ApplicationSubmissionContext submissionContext =
516         buildSubmitContext(yarnRunner, jobConf);
517 
518     String logMsg = bout.toString();
519     assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
520     		"yarn.app.mapreduce.am.admin-command-opts can cause programs to no " +
521         "longer function if hadoop native libraries are used. These values " +
522     		"should be set as part of the LD_LIBRARY_PATH in the app master JVM " +
523         "env using yarn.app.mapreduce.am.admin.user.env config settings."));
524     assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " +
525         "yarn.app.mapreduce.am.command-opts can cause programs to no longer " +
526         "function if hadoop native libraries are used. These values should " +
527         "be set as part of the LD_LIBRARY_PATH in the app master JVM env " +
528         "using yarn.app.mapreduce.am.env config settings."));
529   }
530 
531   @Test(timeout=20000)
testAMProfiler()532   public void testAMProfiler() throws Exception {
533     JobConf jobConf = new JobConf();
534 
535     jobConf.setBoolean(MRJobConfig.MR_AM_PROFILE, true);
536 
537     YARNRunner yarnRunner = new YARNRunner(jobConf);
538 
539     ApplicationSubmissionContext submissionContext =
540         buildSubmitContext(yarnRunner, jobConf);
541 
542     ContainerLaunchContext containerSpec = submissionContext.getAMContainerSpec();
543     List<String> commands = containerSpec.getCommands();
544 
545     for(String command : commands) {
546       if (command != null) {
547         if (command.contains(PROFILE_PARAMS)) {
548           return;
549         }
550       }
551     }
552     throw new IllegalStateException("Profiler opts not found!");
553   }
554 
555   @Test
testAMStandardEnv()556   public void testAMStandardEnv() throws Exception {
557     final String ADMIN_LIB_PATH = "foo";
558     final String USER_LIB_PATH = "bar";
559     final String USER_SHELL = "shell";
560     JobConf jobConf = new JobConf();
561 
562     jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV, "LD_LIBRARY_PATH=" +
563         ADMIN_LIB_PATH);
564     jobConf.set(MRJobConfig.MR_AM_ENV, "LD_LIBRARY_PATH="
565         + USER_LIB_PATH);
566     jobConf.set(MRJobConfig.MAPRED_ADMIN_USER_SHELL, USER_SHELL);
567 
568     YARNRunner yarnRunner = new YARNRunner(jobConf);
569     ApplicationSubmissionContext appSubCtx =
570         buildSubmitContext(yarnRunner, jobConf);
571 
572     // make sure PWD is first in the lib path
573     ContainerLaunchContext clc = appSubCtx.getAMContainerSpec();
574     Map<String, String> env = clc.getEnvironment();
575     String libPath = env.get(Environment.LD_LIBRARY_PATH.name());
576     assertNotNull("LD_LIBRARY_PATH not set", libPath);
577     String cps = jobConf.getBoolean(
578         MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
579         MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
580         ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
581     assertEquals("Bad AM LD_LIBRARY_PATH setting",
582         MRApps.crossPlatformifyMREnv(conf, Environment.PWD)
583         + cps + ADMIN_LIB_PATH + cps + USER_LIB_PATH, libPath);
584 
585     // make sure SHELL is set
586     String shell = env.get(Environment.SHELL.name());
587     assertNotNull("SHELL not set", shell);
588     assertEquals("Bad SHELL setting", USER_SHELL, shell);
589   }
590 
buildSubmitContext( YARNRunner yarnRunner, JobConf jobConf)591   private ApplicationSubmissionContext buildSubmitContext(
592       YARNRunner yarnRunner, JobConf jobConf) throws IOException {
593     File jobxml = new File(testWorkDir, MRJobConfig.JOB_CONF_FILE);
594     OutputStream out = new FileOutputStream(jobxml);
595     conf.writeXml(out);
596     out.close();
597 
598     File jobsplit = new File(testWorkDir, MRJobConfig.JOB_SPLIT);
599     out = new FileOutputStream(jobsplit);
600     out.close();
601 
602     File jobsplitmetainfo = new File(testWorkDir,
603         MRJobConfig.JOB_SPLIT_METAINFO);
604     out = new FileOutputStream(jobsplitmetainfo);
605     out.close();
606 
607     return yarnRunner.createApplicationSubmissionContext(jobConf,
608         testWorkDir.toString(), new Credentials());
609   }
610 }
611