1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server; 20 21 import static org.junit.Assert.fail; 22 23 import java.io.File; 24 import java.io.IOException; 25 import java.net.InetSocketAddress; 26 import java.util.ArrayList; 27 import java.util.Arrays; 28 import java.util.Collection; 29 import java.util.LinkedList; 30 import java.util.List; 31 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 import org.apache.hadoop.conf.Configuration; 35 import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 36 import org.apache.hadoop.io.DataInputBuffer; 37 import org.apache.hadoop.minikdc.KerberosSecurityTestcase; 38 import org.apache.hadoop.net.NetUtils; 39 import org.apache.hadoop.security.UserGroupInformation; 40 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 41 import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 42 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; 43 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; 44 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 45 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; 46 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; 47 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; 48 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; 49 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 50 import org.apache.hadoop.yarn.api.records.ApplicationId; 51 import org.apache.hadoop.yarn.api.records.ContainerId; 52 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 53 import org.apache.hadoop.yarn.api.records.ContainerState; 54 import org.apache.hadoop.yarn.api.records.NodeId; 55 import org.apache.hadoop.yarn.api.records.Priority; 56 import org.apache.hadoop.yarn.api.records.Resource; 57 import org.apache.hadoop.yarn.api.records.SerializedException; 58 import org.apache.hadoop.yarn.api.records.Token; 59 import org.apache.hadoop.yarn.client.NMProxy; 60 import org.apache.hadoop.yarn.conf.YarnConfiguration; 61 import org.apache.hadoop.yarn.exceptions.YarnException; 62 import org.apache.hadoop.yarn.factories.RecordFactory; 63 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 64 import org.apache.hadoop.yarn.ipc.YarnRPC; 65 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; 66 import org.apache.hadoop.yarn.security.NMTokenIdentifier; 67 import org.apache.hadoop.yarn.server.nodemanager.Context; 68 import org.apache.hadoop.yarn.server.nodemanager.NodeManager; 69 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; 70 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; 71 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; 72 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; 73 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; 74 import org.apache.hadoop.yarn.server.utils.BuilderUtils; 75 import org.apache.hadoop.yarn.util.ConverterUtils; 76 import org.apache.hadoop.yarn.util.Records; 77 import org.junit.After; 78 import org.junit.Assert; 79 import org.junit.Before; 80 import org.junit.Test; 81 import org.junit.runner.RunWith; 82 import org.junit.runners.Parameterized; 83 import org.junit.runners.Parameterized.Parameters; 84 85 import com.google.common.io.ByteArrayDataInput; 86 import com.google.common.io.ByteStreams; 87 88 @RunWith(Parameterized.class) 89 public class TestContainerManagerSecurity extends KerberosSecurityTestcase { 90 91 static Log LOG = LogFactory.getLog(TestContainerManagerSecurity.class); 92 static final RecordFactory recordFactory = RecordFactoryProvider 93 .getRecordFactory(null); 94 private static MiniYARNCluster yarnCluster; 95 private static final File testRootDir = new File("target", 96 TestContainerManagerSecurity.class.getName() + "-root"); 97 private static File httpSpnegoKeytabFile = new File(testRootDir, 98 "httpSpnegoKeytabFile.keytab"); 99 private static String httpSpnegoPrincipal = "HTTP/localhost@EXAMPLE.COM"; 100 101 private Configuration conf; 102 103 @Before setUp()104 public void setUp() throws Exception { 105 testRootDir.mkdirs(); 106 httpSpnegoKeytabFile.deleteOnExit(); 107 getKdc().createPrincipal(httpSpnegoKeytabFile, httpSpnegoPrincipal); 108 } 109 110 @After tearDown()111 public void tearDown() { 112 testRootDir.delete(); 113 } 114 115 @Parameters configs()116 public static Collection<Object[]> configs() { 117 Configuration configurationWithoutSecurity = new Configuration(); 118 configurationWithoutSecurity.set( 119 CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple"); 120 121 Configuration configurationWithSecurity = new Configuration(); 122 configurationWithSecurity.set( 123 CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); 124 configurationWithSecurity.set( 125 YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal); 126 configurationWithSecurity.set( 127 YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, 128 httpSpnegoKeytabFile.getAbsolutePath()); 129 configurationWithSecurity.set( 130 YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal); 131 configurationWithSecurity.set( 132 YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, 133 httpSpnegoKeytabFile.getAbsolutePath()); 134 135 return Arrays.asList(new Object[][] { { configurationWithoutSecurity }, 136 { configurationWithSecurity } }); 137 } 138 TestContainerManagerSecurity(Configuration conf)139 public TestContainerManagerSecurity(Configuration conf) { 140 conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L); 141 UserGroupInformation.setConfiguration(conf); 142 this.conf = conf; 143 } 144 145 @Test (timeout = 120000) testContainerManager()146 public void testContainerManager() throws Exception { 147 try { 148 yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class 149 .getName(), 1, 1, 1); 150 yarnCluster.init(conf); 151 yarnCluster.start(); 152 153 // TestNMTokens. 154 testNMTokens(conf); 155 156 // Testing for container token tampering 157 testContainerToken(conf); 158 159 } catch (Exception e) { 160 e.printStackTrace(); 161 throw e; 162 } finally { 163 if (yarnCluster != null) { 164 yarnCluster.stop(); 165 yarnCluster = null; 166 } 167 } 168 } 169 170 @Test (timeout = 120000) testContainerManagerWithEpoch()171 public void testContainerManagerWithEpoch() throws Exception { 172 try { 173 yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class 174 .getName(), 1, 1, 1); 175 yarnCluster.init(conf); 176 yarnCluster.start(); 177 178 // Testing for container token tampering 179 testContainerTokenWithEpoch(conf); 180 181 } finally { 182 if (yarnCluster != null) { 183 yarnCluster.stop(); 184 yarnCluster = null; 185 } 186 } 187 } 188 testNMTokens(Configuration conf)189 private void testNMTokens(Configuration conf) throws Exception { 190 NMTokenSecretManagerInRM nmTokenSecretManagerRM = 191 yarnCluster.getResourceManager().getRMContext() 192 .getNMTokenSecretManager(); 193 NMTokenSecretManagerInNM nmTokenSecretManagerNM = 194 yarnCluster.getNodeManager(0).getNMContext().getNMTokenSecretManager(); 195 RMContainerTokenSecretManager containerTokenSecretManager = 196 yarnCluster.getResourceManager().getRMContext(). 197 getContainerTokenSecretManager(); 198 199 NodeManager nm = yarnCluster.getNodeManager(0); 200 201 waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM, nm); 202 203 // Both id should be equal. 204 Assert.assertEquals(nmTokenSecretManagerNM.getCurrentKey().getKeyId(), 205 nmTokenSecretManagerRM.getCurrentKey().getKeyId()); 206 207 /* 208 * Below cases should be tested. 209 * 1) If Invalid NMToken is used then it should be rejected. 210 * 2) If valid NMToken but belonging to another Node is used then that 211 * too should be rejected. 212 * 3) NMToken for say appAttempt-1 is used for starting/stopping/retrieving 213 * status for container with containerId for say appAttempt-2 should 214 * be rejected. 215 * 4) After start container call is successful nmtoken should have been 216 * saved in NMTokenSecretManagerInNM. 217 * 5) If start container call was successful (no matter if container is 218 * still running or not), appAttempt->NMToken should be present in 219 * NMTokenSecretManagerInNM's cache. Any future getContainerStatus call 220 * for containerId belonging to that application attempt using 221 * applicationAttempt's older nmToken should not get any invalid 222 * nmToken error. (This can be best tested if we roll over NMToken 223 * master key twice). 224 */ 225 YarnRPC rpc = YarnRPC.create(conf); 226 String user = "test"; 227 Resource r = Resource.newInstance(1024, 1); 228 229 ApplicationId appId = ApplicationId.newInstance(1, 1); 230 ApplicationAttemptId validAppAttemptId = 231 ApplicationAttemptId.newInstance(appId, 1); 232 233 ContainerId validContainerId = 234 ContainerId.newContainerId(validAppAttemptId, 0); 235 236 NodeId validNode = yarnCluster.getNodeManager(0).getNMContext().getNodeId(); 237 NodeId invalidNode = NodeId.newInstance("InvalidHost", 1234); 238 239 240 org.apache.hadoop.yarn.api.records.Token validNMToken = 241 nmTokenSecretManagerRM.createNMToken(validAppAttemptId, validNode, user); 242 243 org.apache.hadoop.yarn.api.records.Token validContainerToken = 244 containerTokenSecretManager.createContainerToken(validContainerId, 245 validNode, user, r, Priority.newInstance(10), 1234); 246 ContainerTokenIdentifier identifier = 247 BuilderUtils.newContainerTokenIdentifier(validContainerToken); 248 Assert.assertEquals(Priority.newInstance(10), identifier.getPriority()); 249 Assert.assertEquals(1234, identifier.getCreationTime()); 250 251 StringBuilder sb; 252 // testInvalidNMToken ... creating NMToken using different secret manager. 253 254 NMTokenSecretManagerInRM tempManager = new NMTokenSecretManagerInRM(conf); 255 tempManager.rollMasterKey(); 256 do { 257 tempManager.rollMasterKey(); 258 tempManager.activateNextMasterKey(); 259 // Making sure key id is different. 260 } while (tempManager.getCurrentKey().getKeyId() == nmTokenSecretManagerRM 261 .getCurrentKey().getKeyId()); 262 263 // Testing that NM rejects the requests when we don't send any token. 264 if (UserGroupInformation.isSecurityEnabled()) { 265 sb = new StringBuilder("Client cannot authenticate via:[TOKEN]"); 266 } else { 267 sb = 268 new StringBuilder( 269 "SIMPLE authentication is not enabled. Available:[TOKEN]"); 270 } 271 String errorMsg = testStartContainer(rpc, validAppAttemptId, validNode, 272 validContainerToken, null, true); 273 Assert.assertTrue(errorMsg.contains(sb.toString())); 274 275 org.apache.hadoop.yarn.api.records.Token invalidNMToken = 276 tempManager.createNMToken(validAppAttemptId, validNode, user); 277 sb = new StringBuilder("Given NMToken for application : "); 278 sb.append(validAppAttemptId.toString()) 279 .append(" seems to have been generated illegally."); 280 Assert.assertTrue(sb.toString().contains( 281 testStartContainer(rpc, validAppAttemptId, validNode, 282 validContainerToken, invalidNMToken, true))); 283 284 // valid NMToken but belonging to other node 285 invalidNMToken = 286 nmTokenSecretManagerRM.createNMToken(validAppAttemptId, invalidNode, 287 user); 288 sb = new StringBuilder("Given NMToken for application : "); 289 sb.append(validAppAttemptId) 290 .append(" is not valid for current node manager.expected : ") 291 .append(validNode.toString()) 292 .append(" found : ").append(invalidNode.toString()); 293 Assert.assertTrue(sb.toString().contains( 294 testStartContainer(rpc, validAppAttemptId, validNode, 295 validContainerToken, invalidNMToken, true))); 296 297 // using correct tokens. nmtoken for app attempt should get saved. 298 conf.setInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 299 4 * 60 * 1000); 300 validContainerToken = 301 containerTokenSecretManager.createContainerToken(validContainerId, 302 validNode, user, r, Priority.newInstance(0), 0); 303 Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode, 304 validContainerToken, validNMToken, false).isEmpty()); 305 Assert.assertTrue(nmTokenSecretManagerNM 306 .isAppAttemptNMTokenKeyPresent(validAppAttemptId)); 307 308 // using a new compatible version nmtoken, expect container can be started 309 // successfully. 310 ApplicationAttemptId validAppAttemptId2 = 311 ApplicationAttemptId.newInstance(appId, 2); 312 313 ContainerId validContainerId2 = 314 ContainerId.newContainerId(validAppAttemptId2, 0); 315 316 org.apache.hadoop.yarn.api.records.Token validContainerToken2 = 317 containerTokenSecretManager.createContainerToken(validContainerId2, 318 validNode, user, r, Priority.newInstance(0), 0); 319 320 org.apache.hadoop.yarn.api.records.Token validNMToken2 = 321 nmTokenSecretManagerRM.createNMToken(validAppAttemptId2, validNode, user); 322 // First, get a new NMTokenIdentifier. 323 NMTokenIdentifier newIdentifier = new NMTokenIdentifier(); 324 byte[] tokenIdentifierContent = validNMToken2.getIdentifier().array(); 325 DataInputBuffer dib = new DataInputBuffer(); 326 dib.reset(tokenIdentifierContent, tokenIdentifierContent.length); 327 newIdentifier.readFields(dib); 328 329 // Then, generate a new version NMTokenIdentifier (NMTokenIdentifierNewForTest) 330 // with additional field of message. 331 NMTokenIdentifierNewForTest newVersionIdentifier = 332 new NMTokenIdentifierNewForTest(newIdentifier, "message"); 333 334 // check new version NMTokenIdentifier has correct info. 335 Assert.assertEquals("The ApplicationAttemptId is changed after set to " + 336 "newVersionIdentifier", validAppAttemptId2.getAttemptId(), 337 newVersionIdentifier.getApplicationAttemptId().getAttemptId() 338 ); 339 340 Assert.assertEquals("The message is changed after set to newVersionIdentifier", 341 "message", newVersionIdentifier.getMessage()); 342 343 Assert.assertEquals("The NodeId is changed after set to newVersionIdentifier", 344 validNode, newVersionIdentifier.getNodeId()); 345 346 // create new Token based on new version NMTokenIdentifier. 347 org.apache.hadoop.yarn.api.records.Token newVersionedNMToken = 348 BaseNMTokenSecretManager.newInstance( 349 nmTokenSecretManagerRM.retrievePassword(newVersionIdentifier), 350 newVersionIdentifier); 351 352 // Verify startContainer is successful and no exception is thrown. 353 Assert.assertTrue(testStartContainer(rpc, validAppAttemptId2, validNode, 354 validContainerToken2, newVersionedNMToken, false).isEmpty()); 355 Assert.assertTrue(nmTokenSecretManagerNM 356 .isAppAttemptNMTokenKeyPresent(validAppAttemptId2)); 357 358 //Now lets wait till container finishes and is removed from node manager. 359 waitForContainerToFinishOnNM(validContainerId); 360 sb = new StringBuilder("Attempt to relaunch the same container with id "); 361 sb.append(validContainerId); 362 Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode, 363 validContainerToken, validNMToken, true).contains(sb.toString())); 364 365 // Container is removed from node manager's memory by this time. 366 // trying to stop the container. It should not throw any exception. 367 testStopContainer(rpc, validAppAttemptId, validNode, validContainerId, 368 validNMToken, false); 369 370 // Rolling over master key twice so that we can check whether older keys 371 // are used for authentication. 372 rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); 373 // Key rolled over once.. rolling over again 374 rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); 375 376 // trying get container status. Now saved nmToken should be used for 377 // authentication... It should complain saying container was recently 378 // stopped. 379 sb = new StringBuilder("Container "); 380 sb.append(validContainerId); 381 sb.append(" was recently stopped on node manager"); 382 Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode, 383 validContainerId, validNMToken, true).contains(sb.toString())); 384 385 // Now lets remove the container from nm-memory 386 nm.getNodeStatusUpdater().clearFinishedContainersFromCache(); 387 388 // This should fail as container is removed from recently tracked finished 389 // containers. 390 sb = new StringBuilder("Container "); 391 sb.append(validContainerId.toString()); 392 sb.append(" is not handled by this NodeManager"); 393 Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode, 394 validContainerId, validNMToken, false).contains(sb.toString())); 395 396 // using appAttempt-1 NMtoken for launching container for appAttempt-2 should 397 // succeed. 398 ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(appId, 2); 399 Token attempt1NMToken = 400 nmTokenSecretManagerRM 401 .createNMToken(validAppAttemptId, validNode, user); 402 org.apache.hadoop.yarn.api.records.Token newContainerToken = 403 containerTokenSecretManager.createContainerToken( 404 ContainerId.newContainerId(attempt2, 1), validNode, user, r, 405 Priority.newInstance(0), 0); 406 Assert.assertTrue(testStartContainer(rpc, attempt2, validNode, 407 newContainerToken, attempt1NMToken, false).isEmpty()); 408 } 409 waitForContainerToFinishOnNM(ContainerId containerId)410 private void waitForContainerToFinishOnNM(ContainerId containerId) { 411 Context nmContet = yarnCluster.getNodeManager(0).getNMContext(); 412 int interval = 4 * 60; // Max time for container token to expire. 413 Assert.assertNotNull(nmContet.getContainers().containsKey(containerId)); 414 while ((interval-- > 0) 415 && !nmContet.getContainers().get(containerId) 416 .cloneAndGetContainerStatus().getState() 417 .equals(ContainerState.COMPLETE)) { 418 try { 419 LOG.info("Waiting for " + containerId + " to complete."); 420 Thread.sleep(1000); 421 } catch (InterruptedException e) { 422 } 423 } 424 // Normally, Containers will be removed from NM context after they are 425 // explicitly acked by RM. Now, manually remove it for testing. 426 yarnCluster.getNodeManager(0).getNodeStatusUpdater() 427 .addCompletedContainer(containerId); 428 nmContet.getContainers().remove(containerId); 429 } 430 waitForNMToReceiveNMTokenKey( NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm)431 protected void waitForNMToReceiveNMTokenKey( 432 NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm) 433 throws InterruptedException { 434 int attempt = 60; 435 ContainerManagerImpl cm = 436 ((ContainerManagerImpl) nm.getNMContext().getContainerManager()); 437 while ((cm.getBlockNewContainerRequestsStatus() || nmTokenSecretManagerNM 438 .getNodeId() == null) && attempt-- > 0) { 439 Thread.sleep(2000); 440 } 441 } 442 rollNMTokenMasterKey( NMTokenSecretManagerInRM nmTokenSecretManagerRM, NMTokenSecretManagerInNM nmTokenSecretManagerNM)443 protected void rollNMTokenMasterKey( 444 NMTokenSecretManagerInRM nmTokenSecretManagerRM, 445 NMTokenSecretManagerInNM nmTokenSecretManagerNM) throws Exception { 446 int oldKeyId = nmTokenSecretManagerRM.getCurrentKey().getKeyId(); 447 nmTokenSecretManagerRM.rollMasterKey(); 448 int interval = 40; 449 while (nmTokenSecretManagerNM.getCurrentKey().getKeyId() == oldKeyId 450 && interval-- > 0) { 451 Thread.sleep(1000); 452 } 453 nmTokenSecretManagerRM.activateNextMasterKey(); 454 Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId() 455 == nmTokenSecretManagerRM.getCurrentKey().getKeyId())); 456 } 457 testStopContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, ContainerId containerId, Token nmToken, boolean isExceptionExpected)458 private String testStopContainer(YarnRPC rpc, 459 ApplicationAttemptId appAttemptId, NodeId nodeId, 460 ContainerId containerId, Token nmToken, boolean isExceptionExpected) { 461 try { 462 stopContainer(rpc, nmToken, 463 Arrays.asList(new ContainerId[] { containerId }), appAttemptId, 464 nodeId); 465 if (isExceptionExpected) { 466 fail("Exception was expected!!"); 467 } 468 return ""; 469 } catch (Exception e) { 470 e.printStackTrace(); 471 return e.getMessage(); 472 } 473 } 474 testGetContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, ContainerId containerId, org.apache.hadoop.yarn.api.records.Token nmToken, boolean isExceptionExpected)475 private String testGetContainer(YarnRPC rpc, 476 ApplicationAttemptId appAttemptId, NodeId nodeId, 477 ContainerId containerId, 478 org.apache.hadoop.yarn.api.records.Token nmToken, 479 boolean isExceptionExpected) { 480 try { 481 getContainerStatus(rpc, nmToken, containerId, appAttemptId, nodeId, 482 isExceptionExpected); 483 if (isExceptionExpected) { 484 fail("Exception was expected!!"); 485 } 486 return ""; 487 } catch (Exception e) { 488 e.printStackTrace(); 489 return e.getMessage(); 490 } 491 } 492 testStartContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, org.apache.hadoop.yarn.api.records.Token containerToken, org.apache.hadoop.yarn.api.records.Token nmToken, boolean isExceptionExpected)493 private String testStartContainer(YarnRPC rpc, 494 ApplicationAttemptId appAttemptId, NodeId nodeId, 495 org.apache.hadoop.yarn.api.records.Token containerToken, 496 org.apache.hadoop.yarn.api.records.Token nmToken, 497 boolean isExceptionExpected) { 498 try { 499 startContainer(rpc, nmToken, containerToken, nodeId, 500 appAttemptId.toString()); 501 if (isExceptionExpected){ 502 fail("Exception was expected!!"); 503 } 504 return ""; 505 } catch (Exception e) { 506 e.printStackTrace(); 507 return e.getMessage(); 508 } 509 } 510 stopContainer(YarnRPC rpc, Token nmToken, List<ContainerId> containerId, ApplicationAttemptId appAttemptId, NodeId nodeId)511 private void stopContainer(YarnRPC rpc, Token nmToken, 512 List<ContainerId> containerId, ApplicationAttemptId appAttemptId, 513 NodeId nodeId) throws Exception { 514 StopContainersRequest request = 515 StopContainersRequest.newInstance(containerId); 516 ContainerManagementProtocol proxy = null; 517 try { 518 proxy = 519 getContainerManagementProtocolProxy(rpc, nmToken, nodeId, 520 appAttemptId.toString()); 521 StopContainersResponse response = proxy.stopContainers(request); 522 if (response.getFailedRequests() != null && 523 response.getFailedRequests().containsKey(containerId)) { 524 parseAndThrowException(response.getFailedRequests().get(containerId) 525 .deSerialize()); 526 } 527 } catch (Exception e) { 528 if (proxy != null) { 529 rpc.stopProxy(proxy, conf); 530 } 531 } 532 } 533 534 private void getContainerStatus(YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken, ContainerId containerId, ApplicationAttemptId appAttemptId, NodeId nodeId, boolean isExceptionExpected)535 getContainerStatus(YarnRPC rpc, 536 org.apache.hadoop.yarn.api.records.Token nmToken, 537 ContainerId containerId, 538 ApplicationAttemptId appAttemptId, NodeId nodeId, 539 boolean isExceptionExpected) throws Exception { 540 List<ContainerId> containerIds = new ArrayList<ContainerId>(); 541 containerIds.add(containerId); 542 GetContainerStatusesRequest request = 543 GetContainerStatusesRequest.newInstance(containerIds); 544 ContainerManagementProtocol proxy = null; 545 try { 546 proxy = 547 getContainerManagementProtocolProxy(rpc, nmToken, nodeId, 548 appAttemptId.toString()); 549 GetContainerStatusesResponse statuses = proxy.getContainerStatuses(request); 550 if (statuses.getFailedRequests() != null 551 && statuses.getFailedRequests().containsKey(containerId)) { 552 parseAndThrowException(statuses.getFailedRequests().get(containerId) 553 .deSerialize()); 554 } 555 } finally { 556 if (proxy != null) { 557 rpc.stopProxy(proxy, conf); 558 } 559 } 560 } 561 startContainer(final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken, org.apache.hadoop.yarn.api.records.Token containerToken, NodeId nodeId, String user)562 private void startContainer(final YarnRPC rpc, 563 org.apache.hadoop.yarn.api.records.Token nmToken, 564 org.apache.hadoop.yarn.api.records.Token containerToken, 565 NodeId nodeId, String user) throws Exception { 566 567 ContainerLaunchContext context = 568 Records.newRecord(ContainerLaunchContext.class); 569 StartContainerRequest scRequest = 570 StartContainerRequest.newInstance(context,containerToken); 571 List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); 572 list.add(scRequest); 573 StartContainersRequest allRequests = 574 StartContainersRequest.newInstance(list); 575 ContainerManagementProtocol proxy = null; 576 try { 577 proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user); 578 StartContainersResponse response = proxy.startContainers(allRequests); 579 for(SerializedException ex : response.getFailedRequests().values()){ 580 parseAndThrowException(ex.deSerialize()); 581 } 582 } finally { 583 if (proxy != null) { 584 rpc.stopProxy(proxy, conf); 585 } 586 } 587 } 588 parseAndThrowException(Throwable t)589 private void parseAndThrowException(Throwable t) throws YarnException, 590 IOException { 591 if (t instanceof YarnException) { 592 throw (YarnException) t; 593 } else if (t instanceof InvalidToken) { 594 throw (InvalidToken) t; 595 } else { 596 throw (IOException) t; 597 } 598 } 599 getContainerManagementProtocolProxy( final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken, NodeId nodeId, String user)600 protected ContainerManagementProtocol getContainerManagementProtocolProxy( 601 final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken, 602 NodeId nodeId, String user) { 603 ContainerManagementProtocol proxy; 604 UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); 605 final InetSocketAddress addr = 606 NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort()); 607 if (nmToken != null) { 608 ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr)); 609 } 610 proxy = 611 NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi, 612 rpc, addr); 613 return proxy; 614 } 615 616 /** 617 * This tests a malice user getting a proper token but then messing with it by 618 * tampering with containerID/Resource etc.. His/her containers should be 619 * rejected. 620 * 621 * @throws IOException 622 * @throws InterruptedException 623 * @throws YarnException 624 */ testContainerToken(Configuration conf)625 private void testContainerToken(Configuration conf) throws IOException, 626 InterruptedException, YarnException { 627 628 LOG.info("Running test for malice user"); 629 /* 630 * We need to check for containerToken (authorization). 631 * Here we will be assuming that we have valid NMToken 632 * 1) ContainerToken used is expired. 633 * 2) ContainerToken is tampered (resource is modified). 634 */ 635 NMTokenSecretManagerInRM nmTokenSecretManagerInRM = 636 yarnCluster.getResourceManager().getRMContext() 637 .getNMTokenSecretManager(); 638 ApplicationId appId = ApplicationId.newInstance(1, 1); 639 ApplicationAttemptId appAttemptId = 640 ApplicationAttemptId.newInstance(appId, 0); 641 ContainerId cId = ContainerId.newContainerId(appAttemptId, 0); 642 NodeManager nm = yarnCluster.getNodeManager(0); 643 NMTokenSecretManagerInNM nmTokenSecretManagerInNM = 644 nm.getNMContext().getNMTokenSecretManager(); 645 String user = "test"; 646 647 waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm); 648 649 NodeId nodeId = nm.getNMContext().getNodeId(); 650 651 // Both id should be equal. 652 Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(), 653 nmTokenSecretManagerInRM.getCurrentKey().getKeyId()); 654 655 656 RMContainerTokenSecretManager containerTokenSecretManager = 657 yarnCluster.getResourceManager().getRMContext(). 658 getContainerTokenSecretManager(); 659 660 Resource r = Resource.newInstance(1230, 2); 661 662 Token containerToken = 663 containerTokenSecretManager.createContainerToken( 664 cId, nodeId, user, r, Priority.newInstance(0), 0); 665 666 ContainerTokenIdentifier containerTokenIdentifier = 667 getContainerTokenIdentifierFromToken(containerToken); 668 669 // Verify new compatible version ContainerTokenIdentifier can work successfully. 670 ContainerTokenIdentifierForTest newVersionTokenIdentifier = 671 new ContainerTokenIdentifierForTest(containerTokenIdentifier, "message"); 672 byte[] password = 673 containerTokenSecretManager.createPassword(newVersionTokenIdentifier); 674 675 Token newContainerToken = BuilderUtils.newContainerToken( 676 nodeId, password, newVersionTokenIdentifier); 677 678 Token nmToken = 679 nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user); 680 YarnRPC rpc = YarnRPC.create(conf); 681 Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId, 682 newContainerToken, nmToken, false).isEmpty()); 683 684 // Creating a tampered Container Token 685 RMContainerTokenSecretManager tamperedContainerTokenSecretManager = 686 new RMContainerTokenSecretManager(conf); 687 tamperedContainerTokenSecretManager.rollMasterKey(); 688 do { 689 tamperedContainerTokenSecretManager.rollMasterKey(); 690 tamperedContainerTokenSecretManager.activateNextMasterKey(); 691 } while (containerTokenSecretManager.getCurrentKey().getKeyId() 692 == tamperedContainerTokenSecretManager.getCurrentKey().getKeyId()); 693 694 ContainerId cId2 = ContainerId.newContainerId(appAttemptId, 1); 695 // Creating modified containerToken 696 Token containerToken2 = 697 tamperedContainerTokenSecretManager.createContainerToken(cId2, nodeId, 698 user, r, Priority.newInstance(0), 0); 699 700 StringBuilder sb = new StringBuilder("Given Container "); 701 sb.append(cId2); 702 sb.append(" seems to have an illegally generated token."); 703 Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId, 704 containerToken2, nmToken, true).contains(sb.toString())); 705 } 706 getContainerTokenIdentifierFromToken( Token containerToken)707 private ContainerTokenIdentifier getContainerTokenIdentifierFromToken( 708 Token containerToken) throws IOException { 709 ContainerTokenIdentifier containerTokenIdentifier; 710 containerTokenIdentifier = new ContainerTokenIdentifier(); 711 byte[] tokenIdentifierContent = containerToken.getIdentifier().array(); 712 DataInputBuffer dib = new DataInputBuffer(); 713 dib.reset(tokenIdentifierContent, tokenIdentifierContent.length); 714 containerTokenIdentifier.readFields(dib); 715 return containerTokenIdentifier; 716 } 717 718 /** 719 * This tests whether a containerId is serialized/deserialized with epoch. 720 * 721 * @throws IOException 722 * @throws InterruptedException 723 * @throws YarnException 724 */ testContainerTokenWithEpoch(Configuration conf)725 private void testContainerTokenWithEpoch(Configuration conf) 726 throws IOException, InterruptedException, YarnException { 727 728 LOG.info("Running test for serializing/deserializing containerIds"); 729 730 NMTokenSecretManagerInRM nmTokenSecretManagerInRM = 731 yarnCluster.getResourceManager().getRMContext() 732 .getNMTokenSecretManager(); 733 ApplicationId appId = ApplicationId.newInstance(1, 1); 734 ApplicationAttemptId appAttemptId = 735 ApplicationAttemptId.newInstance(appId, 0); 736 ContainerId cId = ContainerId.newContainerId(appAttemptId, (5L << 40) | 3L); 737 NodeManager nm = yarnCluster.getNodeManager(0); 738 NMTokenSecretManagerInNM nmTokenSecretManagerInNM = 739 nm.getNMContext().getNMTokenSecretManager(); 740 String user = "test"; 741 742 waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm); 743 744 NodeId nodeId = nm.getNMContext().getNodeId(); 745 746 // Both id should be equal. 747 Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(), 748 nmTokenSecretManagerInRM.getCurrentKey().getKeyId()); 749 750 // Creating a normal Container Token 751 RMContainerTokenSecretManager containerTokenSecretManager = 752 yarnCluster.getResourceManager().getRMContext(). 753 getContainerTokenSecretManager(); 754 Resource r = Resource.newInstance(1230, 2); 755 Token containerToken = 756 containerTokenSecretManager.createContainerToken(cId, nodeId, user, r, 757 Priority.newInstance(0), 0); 758 759 ContainerTokenIdentifier containerTokenIdentifier = 760 new ContainerTokenIdentifier(); 761 byte[] tokenIdentifierContent = containerToken.getIdentifier().array(); 762 DataInputBuffer dib = new DataInputBuffer(); 763 dib.reset(tokenIdentifierContent, tokenIdentifierContent.length); 764 containerTokenIdentifier.readFields(dib); 765 766 767 Assert.assertEquals(cId, containerTokenIdentifier.getContainerID()); 768 Assert.assertEquals( 769 cId.toString(), containerTokenIdentifier.getContainerID().toString()); 770 771 Token nmToken = 772 nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user); 773 774 YarnRPC rpc = YarnRPC.create(conf); 775 testStartContainer(rpc, appAttemptId, nodeId, containerToken, nmToken, 776 false); 777 778 List<ContainerId> containerIds = new LinkedList<ContainerId>(); 779 containerIds.add(cId); 780 ContainerManagementProtocol proxy 781 = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user); 782 GetContainerStatusesResponse res = proxy.getContainerStatuses( 783 GetContainerStatusesRequest.newInstance(containerIds)); 784 Assert.assertNotNull(res.getContainerStatuses().get(0)); 785 Assert.assertEquals( 786 cId, res.getContainerStatuses().get(0).getContainerId()); 787 Assert.assertEquals(cId.toString(), 788 res.getContainerStatuses().get(0).getContainerId().toString()); 789 } 790 } 791