1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import static org.mockito.Matchers.any; 22 import static org.mockito.Mockito.doAnswer; 23 import static org.mockito.Mockito.doReturn; 24 import static org.mockito.Mockito.mock; 25 import static org.mockito.Mockito.spy; 26 import static org.mockito.Mockito.times; 27 import static org.mockito.Mockito.verify; 28 import static org.mockito.Mockito.when; 29 30 import java.io.ByteArrayOutputStream; 31 import java.io.File; 32 import java.io.FileOutputStream; 33 import java.io.IOException; 34 import java.io.OutputStream; 35 import java.net.InetSocketAddress; 36 import java.nio.ByteBuffer; 37 import java.security.PrivilegedExceptionAction; 38 import java.util.List; 39 import java.util.Map; 40 41 import junit.framework.TestCase; 42 43 import org.apache.commons.logging.Log; 44 import org.apache.commons.logging.LogFactory; 45 import org.apache.hadoop.conf.Configuration; 46 import org.apache.hadoop.fs.CommonConfigurationKeys; 47 import org.apache.hadoop.fs.FileContext; 48 import org.apache.hadoop.fs.FileUtil; 49 import org.apache.hadoop.fs.Path; 50 import org.apache.hadoop.io.Text; 51 import org.apache.hadoop.mapreduce.JobID; 52 import org.apache.hadoop.mapreduce.JobPriority; 53 import org.apache.hadoop.mapreduce.JobStatus.State; 54 import org.apache.hadoop.mapreduce.MRConfig; 55 import org.apache.hadoop.mapreduce.MRJobConfig; 56 import org.apache.hadoop.mapreduce.TypeConverter; 57 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; 58 import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; 59 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; 60 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse; 61 import org.apache.hadoop.mapreduce.v2.util.MRApps; 62 import org.apache.hadoop.security.Credentials; 63 import org.apache.hadoop.security.SecurityUtil; 64 import org.apache.hadoop.security.UserGroupInformation; 65 import org.apache.hadoop.security.token.Token; 66 import org.apache.hadoop.yarn.api.ApplicationClientProtocol; 67 import org.apache.hadoop.yarn.api.ApplicationConstants; 68 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 69 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; 70 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; 71 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; 72 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; 73 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; 74 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; 75 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; 76 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; 77 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; 78 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 79 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; 80 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; 81 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; 82 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; 83 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 84 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; 85 import org.apache.hadoop.yarn.api.records.ApplicationId; 86 import org.apache.hadoop.yarn.api.records.ApplicationReport; 87 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 88 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 89 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 90 import org.apache.hadoop.yarn.api.records.QueueInfo; 91 import org.apache.hadoop.yarn.api.records.YarnApplicationState; 92 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; 93 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; 94 import org.apache.hadoop.yarn.conf.YarnConfiguration; 95 import org.apache.hadoop.yarn.factories.RecordFactory; 96 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 97 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; 98 import org.apache.hadoop.yarn.util.Records; 99 import org.apache.log4j.Appender; 100 import org.apache.log4j.Layout; 101 import org.apache.log4j.Logger; 102 import org.apache.log4j.SimpleLayout; 103 import org.apache.log4j.WriterAppender; 104 import org.junit.After; 105 import org.junit.Before; 106 import org.junit.Test; 107 import org.mockito.invocation.InvocationOnMock; 108 import org.mockito.stubbing.Answer; 109 110 /** 111 * Test YarnRunner and make sure the client side plugin works 112 * fine 113 */ 114 public class TestYARNRunner extends TestCase { 115 private static final Log LOG = LogFactory.getLog(TestYARNRunner.class); 116 private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); 117 118 // prefix before <LOG_DIR>/profile.out 119 private static final String PROFILE_PARAMS = 120 MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0, 121 MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%")); 122 123 private YARNRunner yarnRunner; 124 private ResourceMgrDelegate resourceMgrDelegate; 125 private YarnConfiguration conf; 126 private ClientCache clientCache; 127 private ApplicationId appId; 128 private JobID jobId; 129 private File testWorkDir = 130 new File("target", TestYARNRunner.class.getName()); 131 private ApplicationSubmissionContext submissionContext; 132 private ClientServiceDelegate clientDelegate; 133 private static final String failString = "Rejected job"; 134 135 @Before setUp()136 public void setUp() throws Exception { 137 resourceMgrDelegate = mock(ResourceMgrDelegate.class); 138 conf = new YarnConfiguration(); 139 conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM"); 140 clientCache = new ClientCache(conf, resourceMgrDelegate); 141 clientCache = spy(clientCache); 142 yarnRunner = new YARNRunner(conf, resourceMgrDelegate, clientCache); 143 yarnRunner = spy(yarnRunner); 144 submissionContext = mock(ApplicationSubmissionContext.class); 145 doAnswer( 146 new Answer<ApplicationSubmissionContext>() { 147 @Override 148 public ApplicationSubmissionContext answer(InvocationOnMock invocation) 149 throws Throwable { 150 return submissionContext; 151 } 152 } 153 ).when(yarnRunner).createApplicationSubmissionContext(any(Configuration.class), 154 any(String.class), any(Credentials.class)); 155 156 appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); 157 jobId = TypeConverter.fromYarn(appId); 158 if (testWorkDir.exists()) { 159 FileContext.getLocalFSFileContext().delete(new Path(testWorkDir.toString()), true); 160 } 161 testWorkDir.mkdirs(); 162 } 163 164 @After cleanup()165 public void cleanup() { 166 FileUtil.fullyDelete(testWorkDir); 167 } 168 169 @Test(timeout=20000) testJobKill()170 public void testJobKill() throws Exception { 171 clientDelegate = mock(ClientServiceDelegate.class); 172 when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new 173 org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, 174 State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); 175 when(clientDelegate.killJob(any(JobID.class))).thenReturn(true); 176 doAnswer( 177 new Answer<ClientServiceDelegate>() { 178 @Override 179 public ClientServiceDelegate answer(InvocationOnMock invocation) 180 throws Throwable { 181 return clientDelegate; 182 } 183 } 184 ).when(clientCache).getClient(any(JobID.class)); 185 yarnRunner.killJob(jobId); 186 verify(resourceMgrDelegate).killApplication(appId); 187 when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new 188 org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, 189 State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); 190 yarnRunner.killJob(jobId); 191 verify(clientDelegate).killJob(jobId); 192 193 when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null); 194 when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class))) 195 .thenReturn( 196 ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp", 197 "tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp", 198 0l, 0l, FinalApplicationStatus.SUCCEEDED, null, null, 0f, 199 "tmp", null)); 200 yarnRunner.killJob(jobId); 201 verify(clientDelegate).killJob(jobId); 202 } 203 204 @Test(timeout=60000) testJobKillTimeout()205 public void testJobKillTimeout() throws Exception { 206 long timeToWaitBeforeHardKill = 207 10000 + MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS; 208 conf.setLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS, 209 timeToWaitBeforeHardKill); 210 clientDelegate = mock(ClientServiceDelegate.class); 211 doAnswer( 212 new Answer<ClientServiceDelegate>() { 213 @Override 214 public ClientServiceDelegate answer(InvocationOnMock invocation) 215 throws Throwable { 216 return clientDelegate; 217 } 218 } 219 ).when(clientCache).getClient(any(JobID.class)); 220 when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new 221 org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f, 222 State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); 223 long startTimeMillis = System.currentTimeMillis(); 224 yarnRunner.killJob(jobId); 225 assertTrue("killJob should have waited at least " + timeToWaitBeforeHardKill 226 + " ms.", System.currentTimeMillis() - startTimeMillis 227 >= timeToWaitBeforeHardKill); 228 } 229 230 @Test(timeout=20000) testJobSubmissionFailure()231 public void testJobSubmissionFailure() throws Exception { 232 when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))). 233 thenReturn(appId); 234 ApplicationReport report = mock(ApplicationReport.class); 235 when(report.getApplicationId()).thenReturn(appId); 236 when(report.getDiagnostics()).thenReturn(failString); 237 when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FAILED); 238 when(resourceMgrDelegate.getApplicationReport(appId)).thenReturn(report); 239 Credentials credentials = new Credentials(); 240 File jobxml = new File(testWorkDir, "job.xml"); 241 OutputStream out = new FileOutputStream(jobxml); 242 conf.writeXml(out); 243 out.close(); 244 try { 245 yarnRunner.submitJob(jobId, testWorkDir.getAbsolutePath().toString(), credentials); 246 } catch(IOException io) { 247 LOG.info("Logging exception:", io); 248 assertTrue(io.getLocalizedMessage().contains(failString)); 249 } 250 } 251 252 @Test(timeout=20000) testResourceMgrDelegate()253 public void testResourceMgrDelegate() throws Exception { 254 /* we not want a mock of resource mgr delegate */ 255 final ApplicationClientProtocol clientRMProtocol = mock(ApplicationClientProtocol.class); 256 ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) { 257 @Override 258 protected void serviceStart() throws Exception { 259 assertTrue(this.client instanceof YarnClientImpl); 260 ((YarnClientImpl) this.client).setRMClient(clientRMProtocol); 261 } 262 }; 263 /* make sure kill calls finish application master */ 264 when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class))) 265 .thenReturn(KillApplicationResponse.newInstance(true)); 266 delegate.killApplication(appId); 267 verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class)); 268 269 /* make sure getalljobs calls get all applications */ 270 when(clientRMProtocol.getApplications(any(GetApplicationsRequest.class))). 271 thenReturn(recordFactory.newRecordInstance(GetApplicationsResponse.class)); 272 delegate.getAllJobs(); 273 verify(clientRMProtocol).getApplications(any(GetApplicationsRequest.class)); 274 275 /* make sure getapplication report is called */ 276 when(clientRMProtocol.getApplicationReport(any(GetApplicationReportRequest.class))) 277 .thenReturn(recordFactory.newRecordInstance(GetApplicationReportResponse.class)); 278 delegate.getApplicationReport(appId); 279 verify(clientRMProtocol).getApplicationReport(any(GetApplicationReportRequest.class)); 280 281 /* make sure metrics is called */ 282 GetClusterMetricsResponse clusterMetricsResponse = recordFactory.newRecordInstance 283 (GetClusterMetricsResponse.class); 284 clusterMetricsResponse.setClusterMetrics(recordFactory.newRecordInstance( 285 YarnClusterMetrics.class)); 286 when(clientRMProtocol.getClusterMetrics(any(GetClusterMetricsRequest.class))) 287 .thenReturn(clusterMetricsResponse); 288 delegate.getClusterMetrics(); 289 verify(clientRMProtocol).getClusterMetrics(any(GetClusterMetricsRequest.class)); 290 291 when(clientRMProtocol.getClusterNodes(any(GetClusterNodesRequest.class))). 292 thenReturn(recordFactory.newRecordInstance(GetClusterNodesResponse.class)); 293 delegate.getActiveTrackers(); 294 verify(clientRMProtocol).getClusterNodes(any(GetClusterNodesRequest.class)); 295 296 GetNewApplicationResponse newAppResponse = recordFactory.newRecordInstance( 297 GetNewApplicationResponse.class); 298 newAppResponse.setApplicationId(appId); 299 when(clientRMProtocol.getNewApplication(any(GetNewApplicationRequest.class))). 300 thenReturn(newAppResponse); 301 delegate.getNewJobID(); 302 verify(clientRMProtocol).getNewApplication(any(GetNewApplicationRequest.class)); 303 304 GetQueueInfoResponse queueInfoResponse = recordFactory.newRecordInstance( 305 GetQueueInfoResponse.class); 306 queueInfoResponse.setQueueInfo(recordFactory.newRecordInstance(QueueInfo.class)); 307 when(clientRMProtocol.getQueueInfo(any(GetQueueInfoRequest.class))). 308 thenReturn(queueInfoResponse); 309 delegate.getQueues(); 310 verify(clientRMProtocol).getQueueInfo(any(GetQueueInfoRequest.class)); 311 312 GetQueueUserAclsInfoResponse aclResponse = recordFactory.newRecordInstance( 313 GetQueueUserAclsInfoResponse.class); 314 when(clientRMProtocol.getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class))) 315 .thenReturn(aclResponse); 316 delegate.getQueueAclsForCurrentUser(); 317 verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)); 318 } 319 320 @Test(timeout=20000) testGetHSDelegationToken()321 public void testGetHSDelegationToken() throws Exception { 322 try { 323 Configuration conf = new Configuration(); 324 325 // Setup mock service 326 InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444); 327 Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress); 328 329 InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200); 330 Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress); 331 332 // Setup mock rm token 333 RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier( 334 new Text("owner"), new Text("renewer"), new Text("real")); 335 Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>( 336 new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice); 337 token.setKind(RMDelegationTokenIdentifier.KIND_NAME); 338 339 // Setup mock history token 340 org.apache.hadoop.yarn.api.records.Token historyToken = 341 org.apache.hadoop.yarn.api.records.Token.newInstance(new byte[0], 342 MRDelegationTokenIdentifier.KIND_NAME.toString(), new byte[0], 343 hsTokenSevice.toString()); 344 GetDelegationTokenResponse getDtResponse = 345 Records.newRecord(GetDelegationTokenResponse.class); 346 getDtResponse.setDelegationToken(historyToken); 347 348 // mock services 349 MRClientProtocol mockHsProxy = mock(MRClientProtocol.class); 350 doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress(); 351 doReturn(getDtResponse).when(mockHsProxy).getDelegationToken( 352 any(GetDelegationTokenRequest.class)); 353 354 ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class); 355 doReturn(rmTokenSevice).when(rmDelegate).getRMDelegationTokenService(); 356 357 ClientCache clientCache = mock(ClientCache.class); 358 doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy(); 359 360 Credentials creds = new Credentials(); 361 362 YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache); 363 364 // No HS token if no RM token 365 yarnRunner.addHistoryToken(creds); 366 verify(mockHsProxy, times(0)).getDelegationToken( 367 any(GetDelegationTokenRequest.class)); 368 369 // No HS token if RM token, but secirity disabled. 370 creds.addToken(new Text("rmdt"), token); 371 yarnRunner.addHistoryToken(creds); 372 verify(mockHsProxy, times(0)).getDelegationToken( 373 any(GetDelegationTokenRequest.class)); 374 375 conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, 376 "kerberos"); 377 UserGroupInformation.setConfiguration(conf); 378 creds = new Credentials(); 379 380 // No HS token if no RM token, security enabled 381 yarnRunner.addHistoryToken(creds); 382 verify(mockHsProxy, times(0)).getDelegationToken( 383 any(GetDelegationTokenRequest.class)); 384 385 // HS token if RM token present, security enabled 386 creds.addToken(new Text("rmdt"), token); 387 yarnRunner.addHistoryToken(creds); 388 verify(mockHsProxy, times(1)).getDelegationToken( 389 any(GetDelegationTokenRequest.class)); 390 391 // No additional call to get HS token if RM and HS token present 392 yarnRunner.addHistoryToken(creds); 393 verify(mockHsProxy, times(1)).getDelegationToken( 394 any(GetDelegationTokenRequest.class)); 395 } finally { 396 // Back to defaults. 397 UserGroupInformation.setConfiguration(new Configuration()); 398 } 399 } 400 401 @Test(timeout=20000) testHistoryServerToken()402 public void testHistoryServerToken() throws Exception { 403 //Set the master principal in the config 404 conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL"); 405 406 final String masterPrincipal = Master.getMasterPrincipal(conf); 407 408 final MRClientProtocol hsProxy = mock(MRClientProtocol.class); 409 when(hsProxy.getDelegationToken(any(GetDelegationTokenRequest.class))).thenAnswer( 410 new Answer<GetDelegationTokenResponse>() { 411 public GetDelegationTokenResponse answer(InvocationOnMock invocation) { 412 GetDelegationTokenRequest request = 413 (GetDelegationTokenRequest)invocation.getArguments()[0]; 414 // check that the renewer matches the cluster's RM principal 415 assertEquals(masterPrincipal, request.getRenewer() ); 416 417 org.apache.hadoop.yarn.api.records.Token token = 418 recordFactory.newRecordInstance(org.apache.hadoop.yarn.api.records.Token.class); 419 // none of these fields matter for the sake of the test 420 token.setKind(""); 421 token.setService(""); 422 token.setIdentifier(ByteBuffer.allocate(0)); 423 token.setPassword(ByteBuffer.allocate(0)); 424 GetDelegationTokenResponse tokenResponse = 425 recordFactory.newRecordInstance(GetDelegationTokenResponse.class); 426 tokenResponse.setDelegationToken(token); 427 return tokenResponse; 428 } 429 }); 430 431 UserGroupInformation.createRemoteUser("someone").doAs( 432 new PrivilegedExceptionAction<Void>() { 433 @Override 434 public Void run() throws Exception { 435 yarnRunner = new YARNRunner(conf, null, null); 436 yarnRunner.getDelegationTokenFromHS(hsProxy); 437 verify(hsProxy). 438 getDelegationToken(any(GetDelegationTokenRequest.class)); 439 return null; 440 } 441 }); 442 } 443 444 @Test(timeout=20000) testAMAdminCommandOpts()445 public void testAMAdminCommandOpts() throws Exception { 446 JobConf jobConf = new JobConf(); 447 448 jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true"); 449 jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m"); 450 451 YARNRunner yarnRunner = new YARNRunner(jobConf); 452 453 ApplicationSubmissionContext submissionContext = 454 buildSubmitContext(yarnRunner, jobConf); 455 456 ContainerLaunchContext containerSpec = submissionContext.getAMContainerSpec(); 457 List<String> commands = containerSpec.getCommands(); 458 459 int index = 0; 460 int adminIndex = 0; 461 int adminPos = -1; 462 int userIndex = 0; 463 int userPos = -1; 464 int tmpDirPos = -1; 465 466 for(String command : commands) { 467 if(command != null) { 468 assertFalse("Profiler should be disabled by default", 469 command.contains(PROFILE_PARAMS)); 470 adminPos = command.indexOf("-Djava.net.preferIPv4Stack=true"); 471 if(adminPos >= 0) 472 adminIndex = index; 473 474 userPos = command.indexOf("-Xmx1024m"); 475 if(userPos >= 0) 476 userIndex = index; 477 478 tmpDirPos = command.indexOf("-Djava.io.tmpdir="); 479 } 480 481 index++; 482 } 483 484 // Check java.io.tmpdir opts are set in the commands 485 assertTrue("java.io.tmpdir is not set for AM", tmpDirPos > 0); 486 487 // Check both admin java opts and user java opts are in the commands 488 assertTrue("AM admin command opts not in the commands.", adminPos > 0); 489 assertTrue("AM user command opts not in the commands.", userPos > 0); 490 491 // Check the admin java opts is before user java opts in the commands 492 if(adminIndex == userIndex) { 493 assertTrue("AM admin command opts is after user command opts.", adminPos < userPos); 494 } else { 495 assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex); 496 } 497 } 498 @Test(timeout=20000) testWarnCommandOpts()499 public void testWarnCommandOpts() throws Exception { 500 Logger logger = Logger.getLogger(YARNRunner.class); 501 502 ByteArrayOutputStream bout = new ByteArrayOutputStream(); 503 Layout layout = new SimpleLayout(); 504 Appender appender = new WriterAppender(layout, bout); 505 logger.addAppender(appender); 506 507 JobConf jobConf = new JobConf(); 508 509 jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true -Djava.library.path=foo"); 510 jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m -Djava.library.path=bar"); 511 512 YARNRunner yarnRunner = new YARNRunner(jobConf); 513 514 @SuppressWarnings("unused") 515 ApplicationSubmissionContext submissionContext = 516 buildSubmitContext(yarnRunner, jobConf); 517 518 String logMsg = bout.toString(); 519 assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " + 520 "yarn.app.mapreduce.am.admin-command-opts can cause programs to no " + 521 "longer function if hadoop native libraries are used. These values " + 522 "should be set as part of the LD_LIBRARY_PATH in the app master JVM " + 523 "env using yarn.app.mapreduce.am.admin.user.env config settings.")); 524 assertTrue(logMsg.contains("WARN - Usage of -Djava.library.path in " + 525 "yarn.app.mapreduce.am.command-opts can cause programs to no longer " + 526 "function if hadoop native libraries are used. These values should " + 527 "be set as part of the LD_LIBRARY_PATH in the app master JVM env " + 528 "using yarn.app.mapreduce.am.env config settings.")); 529 } 530 531 @Test(timeout=20000) testAMProfiler()532 public void testAMProfiler() throws Exception { 533 JobConf jobConf = new JobConf(); 534 535 jobConf.setBoolean(MRJobConfig.MR_AM_PROFILE, true); 536 537 YARNRunner yarnRunner = new YARNRunner(jobConf); 538 539 ApplicationSubmissionContext submissionContext = 540 buildSubmitContext(yarnRunner, jobConf); 541 542 ContainerLaunchContext containerSpec = submissionContext.getAMContainerSpec(); 543 List<String> commands = containerSpec.getCommands(); 544 545 for(String command : commands) { 546 if (command != null) { 547 if (command.contains(PROFILE_PARAMS)) { 548 return; 549 } 550 } 551 } 552 throw new IllegalStateException("Profiler opts not found!"); 553 } 554 555 @Test testAMStandardEnv()556 public void testAMStandardEnv() throws Exception { 557 final String ADMIN_LIB_PATH = "foo"; 558 final String USER_LIB_PATH = "bar"; 559 final String USER_SHELL = "shell"; 560 JobConf jobConf = new JobConf(); 561 562 jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV, "LD_LIBRARY_PATH=" + 563 ADMIN_LIB_PATH); 564 jobConf.set(MRJobConfig.MR_AM_ENV, "LD_LIBRARY_PATH=" 565 + USER_LIB_PATH); 566 jobConf.set(MRJobConfig.MAPRED_ADMIN_USER_SHELL, USER_SHELL); 567 568 YARNRunner yarnRunner = new YARNRunner(jobConf); 569 ApplicationSubmissionContext appSubCtx = 570 buildSubmitContext(yarnRunner, jobConf); 571 572 // make sure PWD is first in the lib path 573 ContainerLaunchContext clc = appSubCtx.getAMContainerSpec(); 574 Map<String, String> env = clc.getEnvironment(); 575 String libPath = env.get(Environment.LD_LIBRARY_PATH.name()); 576 assertNotNull("LD_LIBRARY_PATH not set", libPath); 577 String cps = jobConf.getBoolean( 578 MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, 579 MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM) 580 ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator; 581 assertEquals("Bad AM LD_LIBRARY_PATH setting", 582 MRApps.crossPlatformifyMREnv(conf, Environment.PWD) 583 + cps + ADMIN_LIB_PATH + cps + USER_LIB_PATH, libPath); 584 585 // make sure SHELL is set 586 String shell = env.get(Environment.SHELL.name()); 587 assertNotNull("SHELL not set", shell); 588 assertEquals("Bad SHELL setting", USER_SHELL, shell); 589 } 590 buildSubmitContext( YARNRunner yarnRunner, JobConf jobConf)591 private ApplicationSubmissionContext buildSubmitContext( 592 YARNRunner yarnRunner, JobConf jobConf) throws IOException { 593 File jobxml = new File(testWorkDir, MRJobConfig.JOB_CONF_FILE); 594 OutputStream out = new FileOutputStream(jobxml); 595 conf.writeXml(out); 596 out.close(); 597 598 File jobsplit = new File(testWorkDir, MRJobConfig.JOB_SPLIT); 599 out = new FileOutputStream(jobsplit); 600 out.close(); 601 602 File jobsplitmetainfo = new File(testWorkDir, 603 MRJobConfig.JOB_SPLIT_METAINFO); 604 out = new FileOutputStream(jobsplitmetainfo); 605 out.close(); 606 607 return yarnRunner.createApplicationSubmissionContext(jobConf, 608 testWorkDir.toString(), new Credentials()); 609 } 610 } 611