1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.server;
20 
21 import static org.junit.Assert.fail;
22 
23 import java.io.File;
24 import java.io.IOException;
25 import java.net.InetSocketAddress;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31 
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
36 import org.apache.hadoop.io.DataInputBuffer;
37 import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
38 import org.apache.hadoop.net.NetUtils;
39 import org.apache.hadoop.security.UserGroupInformation;
40 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
41 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
42 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
43 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
44 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
45 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
46 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
47 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
48 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
49 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
50 import org.apache.hadoop.yarn.api.records.ApplicationId;
51 import org.apache.hadoop.yarn.api.records.ContainerId;
52 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
53 import org.apache.hadoop.yarn.api.records.ContainerState;
54 import org.apache.hadoop.yarn.api.records.NodeId;
55 import org.apache.hadoop.yarn.api.records.Priority;
56 import org.apache.hadoop.yarn.api.records.Resource;
57 import org.apache.hadoop.yarn.api.records.SerializedException;
58 import org.apache.hadoop.yarn.api.records.Token;
59 import org.apache.hadoop.yarn.client.NMProxy;
60 import org.apache.hadoop.yarn.conf.YarnConfiguration;
61 import org.apache.hadoop.yarn.exceptions.YarnException;
62 import org.apache.hadoop.yarn.factories.RecordFactory;
63 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
64 import org.apache.hadoop.yarn.ipc.YarnRPC;
65 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
66 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
67 import org.apache.hadoop.yarn.server.nodemanager.Context;
68 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
69 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
70 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
71 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
72 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
73 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
74 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
75 import org.apache.hadoop.yarn.util.ConverterUtils;
76 import org.apache.hadoop.yarn.util.Records;
77 import org.junit.After;
78 import org.junit.Assert;
79 import org.junit.Before;
80 import org.junit.Test;
81 import org.junit.runner.RunWith;
82 import org.junit.runners.Parameterized;
83 import org.junit.runners.Parameterized.Parameters;
84 
85 import com.google.common.io.ByteArrayDataInput;
86 import com.google.common.io.ByteStreams;
87 
88 @RunWith(Parameterized.class)
89 public class TestContainerManagerSecurity extends KerberosSecurityTestcase {
90 
91   static Log LOG = LogFactory.getLog(TestContainerManagerSecurity.class);
92   static final RecordFactory recordFactory = RecordFactoryProvider
93       .getRecordFactory(null);
94   private static MiniYARNCluster yarnCluster;
95   private static final File testRootDir = new File("target",
96     TestContainerManagerSecurity.class.getName() + "-root");
97   private static File httpSpnegoKeytabFile = new File(testRootDir,
98     "httpSpnegoKeytabFile.keytab");
99   private static String httpSpnegoPrincipal = "HTTP/localhost@EXAMPLE.COM";
100 
101   private Configuration conf;
102 
103   @Before
setUp()104   public void setUp() throws Exception {
105     testRootDir.mkdirs();
106     httpSpnegoKeytabFile.deleteOnExit();
107     getKdc().createPrincipal(httpSpnegoKeytabFile, httpSpnegoPrincipal);
108   }
109 
110   @After
tearDown()111   public void tearDown() {
112     testRootDir.delete();
113   }
114 
115   @Parameters
configs()116   public static Collection<Object[]> configs() {
117     Configuration configurationWithoutSecurity = new Configuration();
118     configurationWithoutSecurity.set(
119         CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple");
120 
121     Configuration configurationWithSecurity = new Configuration();
122     configurationWithSecurity.set(
123       CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
124     configurationWithSecurity.set(
125       YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal);
126     configurationWithSecurity.set(
127       YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,
128       httpSpnegoKeytabFile.getAbsolutePath());
129     configurationWithSecurity.set(
130       YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, httpSpnegoPrincipal);
131     configurationWithSecurity.set(
132       YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY,
133       httpSpnegoKeytabFile.getAbsolutePath());
134 
135     return Arrays.asList(new Object[][] { { configurationWithoutSecurity },
136         { configurationWithSecurity } });
137   }
138 
TestContainerManagerSecurity(Configuration conf)139   public TestContainerManagerSecurity(Configuration conf) {
140     conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 100000L);
141     UserGroupInformation.setConfiguration(conf);
142     this.conf = conf;
143   }
144 
145   @Test (timeout = 120000)
testContainerManager()146   public void testContainerManager() throws Exception {
147     try {
148       yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
149           .getName(), 1, 1, 1);
150       yarnCluster.init(conf);
151       yarnCluster.start();
152 
153       // TestNMTokens.
154       testNMTokens(conf);
155 
156       // Testing for container token tampering
157       testContainerToken(conf);
158 
159     } catch (Exception e) {
160       e.printStackTrace();
161       throw e;
162     } finally {
163       if (yarnCluster != null) {
164         yarnCluster.stop();
165         yarnCluster = null;
166       }
167     }
168   }
169 
170   @Test (timeout = 120000)
testContainerManagerWithEpoch()171   public void testContainerManagerWithEpoch() throws Exception {
172     try {
173       yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
174           .getName(), 1, 1, 1);
175       yarnCluster.init(conf);
176       yarnCluster.start();
177 
178       // Testing for container token tampering
179       testContainerTokenWithEpoch(conf);
180 
181     } finally {
182       if (yarnCluster != null) {
183         yarnCluster.stop();
184         yarnCluster = null;
185       }
186     }
187   }
188 
testNMTokens(Configuration conf)189   private void testNMTokens(Configuration conf) throws Exception {
190     NMTokenSecretManagerInRM nmTokenSecretManagerRM =
191         yarnCluster.getResourceManager().getRMContext()
192           .getNMTokenSecretManager();
193     NMTokenSecretManagerInNM nmTokenSecretManagerNM =
194         yarnCluster.getNodeManager(0).getNMContext().getNMTokenSecretManager();
195     RMContainerTokenSecretManager containerTokenSecretManager =
196         yarnCluster.getResourceManager().getRMContext().
197             getContainerTokenSecretManager();
198 
199     NodeManager nm = yarnCluster.getNodeManager(0);
200 
201     waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM, nm);
202 
203     // Both id should be equal.
204     Assert.assertEquals(nmTokenSecretManagerNM.getCurrentKey().getKeyId(),
205         nmTokenSecretManagerRM.getCurrentKey().getKeyId());
206 
207     /*
208      * Below cases should be tested.
209      * 1) If Invalid NMToken is used then it should be rejected.
210      * 2) If valid NMToken but belonging to another Node is used then that
211      * too should be rejected.
212      * 3) NMToken for say appAttempt-1 is used for starting/stopping/retrieving
213      * status for container with containerId for say appAttempt-2 should
214      * be rejected.
215      * 4) After start container call is successful nmtoken should have been
216      * saved in NMTokenSecretManagerInNM.
217      * 5) If start container call was successful (no matter if container is
218      * still running or not), appAttempt->NMToken should be present in
219      * NMTokenSecretManagerInNM's cache. Any future getContainerStatus call
220      * for containerId belonging to that application attempt using
221      * applicationAttempt's older nmToken should not get any invalid
222      * nmToken error. (This can be best tested if we roll over NMToken
223      * master key twice).
224      */
225     YarnRPC rpc = YarnRPC.create(conf);
226     String user = "test";
227     Resource r = Resource.newInstance(1024, 1);
228 
229     ApplicationId appId = ApplicationId.newInstance(1, 1);
230     ApplicationAttemptId validAppAttemptId =
231         ApplicationAttemptId.newInstance(appId, 1);
232 
233     ContainerId validContainerId =
234         ContainerId.newContainerId(validAppAttemptId, 0);
235 
236     NodeId validNode = yarnCluster.getNodeManager(0).getNMContext().getNodeId();
237     NodeId invalidNode = NodeId.newInstance("InvalidHost", 1234);
238 
239 
240     org.apache.hadoop.yarn.api.records.Token validNMToken =
241         nmTokenSecretManagerRM.createNMToken(validAppAttemptId, validNode, user);
242 
243     org.apache.hadoop.yarn.api.records.Token validContainerToken =
244         containerTokenSecretManager.createContainerToken(validContainerId,
245             validNode, user, r, Priority.newInstance(10), 1234);
246     ContainerTokenIdentifier identifier =
247         BuilderUtils.newContainerTokenIdentifier(validContainerToken);
248     Assert.assertEquals(Priority.newInstance(10), identifier.getPriority());
249     Assert.assertEquals(1234, identifier.getCreationTime());
250 
251     StringBuilder sb;
252     // testInvalidNMToken ... creating NMToken using different secret manager.
253 
254     NMTokenSecretManagerInRM tempManager = new NMTokenSecretManagerInRM(conf);
255     tempManager.rollMasterKey();
256     do {
257       tempManager.rollMasterKey();
258       tempManager.activateNextMasterKey();
259       // Making sure key id is different.
260     } while (tempManager.getCurrentKey().getKeyId() == nmTokenSecretManagerRM
261         .getCurrentKey().getKeyId());
262 
263     // Testing that NM rejects the requests when we don't send any token.
264     if (UserGroupInformation.isSecurityEnabled()) {
265       sb = new StringBuilder("Client cannot authenticate via:[TOKEN]");
266     } else {
267       sb =
268           new StringBuilder(
269               "SIMPLE authentication is not enabled.  Available:[TOKEN]");
270     }
271     String errorMsg = testStartContainer(rpc, validAppAttemptId, validNode,
272         validContainerToken, null, true);
273     Assert.assertTrue(errorMsg.contains(sb.toString()));
274 
275     org.apache.hadoop.yarn.api.records.Token invalidNMToken =
276         tempManager.createNMToken(validAppAttemptId, validNode, user);
277     sb = new StringBuilder("Given NMToken for application : ");
278     sb.append(validAppAttemptId.toString())
279       .append(" seems to have been generated illegally.");
280     Assert.assertTrue(sb.toString().contains(
281         testStartContainer(rpc, validAppAttemptId, validNode,
282             validContainerToken, invalidNMToken, true)));
283 
284     // valid NMToken but belonging to other node
285     invalidNMToken =
286         nmTokenSecretManagerRM.createNMToken(validAppAttemptId, invalidNode,
287             user);
288     sb = new StringBuilder("Given NMToken for application : ");
289     sb.append(validAppAttemptId)
290       .append(" is not valid for current node manager.expected : ")
291       .append(validNode.toString())
292       .append(" found : ").append(invalidNode.toString());
293     Assert.assertTrue(sb.toString().contains(
294         testStartContainer(rpc, validAppAttemptId, validNode,
295             validContainerToken, invalidNMToken, true)));
296 
297     // using correct tokens. nmtoken for app attempt should get saved.
298     conf.setInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
299         4 * 60 * 1000);
300     validContainerToken =
301         containerTokenSecretManager.createContainerToken(validContainerId,
302             validNode, user, r, Priority.newInstance(0), 0);
303     Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
304       validContainerToken, validNMToken, false).isEmpty());
305     Assert.assertTrue(nmTokenSecretManagerNM
306         .isAppAttemptNMTokenKeyPresent(validAppAttemptId));
307 
308     // using a new compatible version nmtoken, expect container can be started
309     // successfully.
310     ApplicationAttemptId validAppAttemptId2 =
311         ApplicationAttemptId.newInstance(appId, 2);
312 
313     ContainerId validContainerId2 =
314         ContainerId.newContainerId(validAppAttemptId2, 0);
315 
316     org.apache.hadoop.yarn.api.records.Token validContainerToken2 =
317         containerTokenSecretManager.createContainerToken(validContainerId2,
318             validNode, user, r, Priority.newInstance(0), 0);
319 
320     org.apache.hadoop.yarn.api.records.Token validNMToken2 =
321         nmTokenSecretManagerRM.createNMToken(validAppAttemptId2, validNode, user);
322     // First, get a new NMTokenIdentifier.
323     NMTokenIdentifier newIdentifier = new NMTokenIdentifier();
324     byte[] tokenIdentifierContent = validNMToken2.getIdentifier().array();
325     DataInputBuffer dib = new DataInputBuffer();
326     dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
327     newIdentifier.readFields(dib);
328 
329     // Then, generate a new version NMTokenIdentifier (NMTokenIdentifierNewForTest)
330     // with additional field of message.
331     NMTokenIdentifierNewForTest newVersionIdentifier =
332         new NMTokenIdentifierNewForTest(newIdentifier, "message");
333 
334     // check new version NMTokenIdentifier has correct info.
335     Assert.assertEquals("The ApplicationAttemptId is changed after set to " +
336         "newVersionIdentifier", validAppAttemptId2.getAttemptId(),
337         newVersionIdentifier.getApplicationAttemptId().getAttemptId()
338     );
339 
340     Assert.assertEquals("The message is changed after set to newVersionIdentifier",
341         "message", newVersionIdentifier.getMessage());
342 
343     Assert.assertEquals("The NodeId is changed after set to newVersionIdentifier",
344         validNode, newVersionIdentifier.getNodeId());
345 
346     // create new Token based on new version NMTokenIdentifier.
347     org.apache.hadoop.yarn.api.records.Token newVersionedNMToken =
348         BaseNMTokenSecretManager.newInstance(
349             nmTokenSecretManagerRM.retrievePassword(newVersionIdentifier),
350             newVersionIdentifier);
351 
352     // Verify startContainer is successful and no exception is thrown.
353     Assert.assertTrue(testStartContainer(rpc, validAppAttemptId2, validNode,
354         validContainerToken2, newVersionedNMToken, false).isEmpty());
355     Assert.assertTrue(nmTokenSecretManagerNM
356         .isAppAttemptNMTokenKeyPresent(validAppAttemptId2));
357 
358     //Now lets wait till container finishes and is removed from node manager.
359     waitForContainerToFinishOnNM(validContainerId);
360     sb = new StringBuilder("Attempt to relaunch the same container with id ");
361     sb.append(validContainerId);
362     Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
363         validContainerToken, validNMToken, true).contains(sb.toString()));
364 
365     // Container is removed from node manager's memory by this time.
366     // trying to stop the container. It should not throw any exception.
367     testStopContainer(rpc, validAppAttemptId, validNode, validContainerId,
368         validNMToken, false);
369 
370     // Rolling over master key twice so that we can check whether older keys
371     // are used for authentication.
372     rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
373     // Key rolled over once.. rolling over again
374     rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
375 
376     // trying get container status. Now saved nmToken should be used for
377     // authentication... It should complain saying container was recently
378     // stopped.
379     sb = new StringBuilder("Container ");
380     sb.append(validContainerId);
381     sb.append(" was recently stopped on node manager");
382     Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
383         validContainerId, validNMToken, true).contains(sb.toString()));
384 
385     // Now lets remove the container from nm-memory
386     nm.getNodeStatusUpdater().clearFinishedContainersFromCache();
387 
388     // This should fail as container is removed from recently tracked finished
389     // containers.
390     sb = new StringBuilder("Container ");
391     sb.append(validContainerId.toString());
392     sb.append(" is not handled by this NodeManager");
393     Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode,
394         validContainerId, validNMToken, false).contains(sb.toString()));
395 
396     // using appAttempt-1 NMtoken for launching container for appAttempt-2 should
397     // succeed.
398     ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(appId, 2);
399     Token attempt1NMToken =
400         nmTokenSecretManagerRM
401           .createNMToken(validAppAttemptId, validNode, user);
402     org.apache.hadoop.yarn.api.records.Token newContainerToken =
403         containerTokenSecretManager.createContainerToken(
404           ContainerId.newContainerId(attempt2, 1), validNode, user, r,
405             Priority.newInstance(0), 0);
406     Assert.assertTrue(testStartContainer(rpc, attempt2, validNode,
407       newContainerToken, attempt1NMToken, false).isEmpty());
408   }
409 
waitForContainerToFinishOnNM(ContainerId containerId)410   private void waitForContainerToFinishOnNM(ContainerId containerId) {
411     Context nmContet = yarnCluster.getNodeManager(0).getNMContext();
412     int interval = 4 * 60; // Max time for container token to expire.
413     Assert.assertNotNull(nmContet.getContainers().containsKey(containerId));
414     while ((interval-- > 0)
415         && !nmContet.getContainers().get(containerId)
416           .cloneAndGetContainerStatus().getState()
417           .equals(ContainerState.COMPLETE)) {
418       try {
419         LOG.info("Waiting for " + containerId + " to complete.");
420         Thread.sleep(1000);
421       } catch (InterruptedException e) {
422       }
423     }
424     // Normally, Containers will be removed from NM context after they are
425     // explicitly acked by RM. Now, manually remove it for testing.
426     yarnCluster.getNodeManager(0).getNodeStatusUpdater()
427       .addCompletedContainer(containerId);
428     nmContet.getContainers().remove(containerId);
429   }
430 
waitForNMToReceiveNMTokenKey( NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm)431   protected void waitForNMToReceiveNMTokenKey(
432       NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm)
433       throws InterruptedException {
434     int attempt = 60;
435     ContainerManagerImpl cm =
436         ((ContainerManagerImpl) nm.getNMContext().getContainerManager());
437     while ((cm.getBlockNewContainerRequestsStatus() || nmTokenSecretManagerNM
438         .getNodeId() == null) && attempt-- > 0) {
439       Thread.sleep(2000);
440     }
441   }
442 
rollNMTokenMasterKey( NMTokenSecretManagerInRM nmTokenSecretManagerRM, NMTokenSecretManagerInNM nmTokenSecretManagerNM)443   protected void rollNMTokenMasterKey(
444       NMTokenSecretManagerInRM nmTokenSecretManagerRM,
445       NMTokenSecretManagerInNM nmTokenSecretManagerNM) throws Exception {
446     int oldKeyId = nmTokenSecretManagerRM.getCurrentKey().getKeyId();
447     nmTokenSecretManagerRM.rollMasterKey();
448     int interval = 40;
449     while (nmTokenSecretManagerNM.getCurrentKey().getKeyId() == oldKeyId
450         && interval-- > 0) {
451       Thread.sleep(1000);
452     }
453     nmTokenSecretManagerRM.activateNextMasterKey();
454     Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId()
455         == nmTokenSecretManagerRM.getCurrentKey().getKeyId()));
456   }
457 
testStopContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, ContainerId containerId, Token nmToken, boolean isExceptionExpected)458   private String testStopContainer(YarnRPC rpc,
459       ApplicationAttemptId appAttemptId, NodeId nodeId,
460       ContainerId containerId, Token nmToken, boolean isExceptionExpected) {
461     try {
462       stopContainer(rpc, nmToken,
463           Arrays.asList(new ContainerId[] { containerId }), appAttemptId,
464           nodeId);
465       if (isExceptionExpected) {
466         fail("Exception was expected!!");
467       }
468       return "";
469     } catch (Exception e) {
470       e.printStackTrace();
471       return e.getMessage();
472     }
473   }
474 
testGetContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, ContainerId containerId, org.apache.hadoop.yarn.api.records.Token nmToken, boolean isExceptionExpected)475   private String testGetContainer(YarnRPC rpc,
476       ApplicationAttemptId appAttemptId, NodeId nodeId,
477       ContainerId containerId,
478       org.apache.hadoop.yarn.api.records.Token nmToken,
479       boolean isExceptionExpected) {
480     try {
481       getContainerStatus(rpc, nmToken, containerId, appAttemptId, nodeId,
482           isExceptionExpected);
483       if (isExceptionExpected) {
484         fail("Exception was expected!!");
485       }
486       return "";
487     } catch (Exception e) {
488       e.printStackTrace();
489       return e.getMessage();
490     }
491   }
492 
testStartContainer(YarnRPC rpc, ApplicationAttemptId appAttemptId, NodeId nodeId, org.apache.hadoop.yarn.api.records.Token containerToken, org.apache.hadoop.yarn.api.records.Token nmToken, boolean isExceptionExpected)493   private String testStartContainer(YarnRPC rpc,
494       ApplicationAttemptId appAttemptId, NodeId nodeId,
495       org.apache.hadoop.yarn.api.records.Token containerToken,
496       org.apache.hadoop.yarn.api.records.Token nmToken,
497       boolean isExceptionExpected) {
498     try {
499       startContainer(rpc, nmToken, containerToken, nodeId,
500           appAttemptId.toString());
501       if (isExceptionExpected){
502         fail("Exception was expected!!");
503       }
504       return "";
505     } catch (Exception e) {
506       e.printStackTrace();
507       return e.getMessage();
508     }
509   }
510 
stopContainer(YarnRPC rpc, Token nmToken, List<ContainerId> containerId, ApplicationAttemptId appAttemptId, NodeId nodeId)511   private void stopContainer(YarnRPC rpc, Token nmToken,
512       List<ContainerId> containerId, ApplicationAttemptId appAttemptId,
513       NodeId nodeId) throws Exception {
514     StopContainersRequest request =
515         StopContainersRequest.newInstance(containerId);
516     ContainerManagementProtocol proxy = null;
517     try {
518       proxy =
519           getContainerManagementProtocolProxy(rpc, nmToken, nodeId,
520               appAttemptId.toString());
521       StopContainersResponse response = proxy.stopContainers(request);
522       if (response.getFailedRequests() != null &&
523           response.getFailedRequests().containsKey(containerId)) {
524         parseAndThrowException(response.getFailedRequests().get(containerId)
525             .deSerialize());
526       }
527     } catch (Exception e) {
528       if (proxy != null) {
529         rpc.stopProxy(proxy, conf);
530       }
531     }
532   }
533 
534   private void
getContainerStatus(YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken, ContainerId containerId, ApplicationAttemptId appAttemptId, NodeId nodeId, boolean isExceptionExpected)535       getContainerStatus(YarnRPC rpc,
536           org.apache.hadoop.yarn.api.records.Token nmToken,
537           ContainerId containerId,
538           ApplicationAttemptId appAttemptId, NodeId nodeId,
539           boolean isExceptionExpected) throws Exception {
540     List<ContainerId> containerIds = new ArrayList<ContainerId>();
541     containerIds.add(containerId);
542     GetContainerStatusesRequest request =
543         GetContainerStatusesRequest.newInstance(containerIds);
544     ContainerManagementProtocol proxy = null;
545     try {
546       proxy =
547           getContainerManagementProtocolProxy(rpc, nmToken, nodeId,
548               appAttemptId.toString());
549       GetContainerStatusesResponse statuses = proxy.getContainerStatuses(request);
550       if (statuses.getFailedRequests() != null
551           && statuses.getFailedRequests().containsKey(containerId)) {
552         parseAndThrowException(statuses.getFailedRequests().get(containerId)
553           .deSerialize());
554       }
555     } finally {
556       if (proxy != null) {
557         rpc.stopProxy(proxy, conf);
558       }
559     }
560   }
561 
startContainer(final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken, org.apache.hadoop.yarn.api.records.Token containerToken, NodeId nodeId, String user)562   private void startContainer(final YarnRPC rpc,
563       org.apache.hadoop.yarn.api.records.Token nmToken,
564       org.apache.hadoop.yarn.api.records.Token containerToken,
565       NodeId nodeId, String user) throws Exception {
566 
567     ContainerLaunchContext context =
568         Records.newRecord(ContainerLaunchContext.class);
569     StartContainerRequest scRequest =
570         StartContainerRequest.newInstance(context,containerToken);
571     List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
572     list.add(scRequest);
573     StartContainersRequest allRequests =
574         StartContainersRequest.newInstance(list);
575     ContainerManagementProtocol proxy = null;
576     try {
577       proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
578       StartContainersResponse response = proxy.startContainers(allRequests);
579       for(SerializedException ex : response.getFailedRequests().values()){
580         parseAndThrowException(ex.deSerialize());
581       }
582     } finally {
583       if (proxy != null) {
584         rpc.stopProxy(proxy, conf);
585       }
586     }
587   }
588 
parseAndThrowException(Throwable t)589   private void parseAndThrowException(Throwable t) throws YarnException,
590       IOException {
591     if (t instanceof YarnException) {
592       throw (YarnException) t;
593     } else if (t instanceof InvalidToken) {
594       throw (InvalidToken) t;
595     } else {
596       throw (IOException) t;
597     }
598   }
599 
getContainerManagementProtocolProxy( final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken, NodeId nodeId, String user)600   protected ContainerManagementProtocol getContainerManagementProtocolProxy(
601       final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken,
602       NodeId nodeId, String user) {
603     ContainerManagementProtocol proxy;
604     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
605     final InetSocketAddress addr =
606         NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort());
607     if (nmToken != null) {
608       ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr));
609     }
610     proxy =
611         NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, ugi,
612           rpc, addr);
613     return proxy;
614   }
615 
616   /**
617    * This tests a malice user getting a proper token but then messing with it by
618    * tampering with containerID/Resource etc.. His/her containers should be
619    * rejected.
620    *
621    * @throws IOException
622    * @throws InterruptedException
623    * @throws YarnException
624    */
testContainerToken(Configuration conf)625   private void testContainerToken(Configuration conf) throws IOException,
626       InterruptedException, YarnException {
627 
628     LOG.info("Running test for malice user");
629     /*
630      * We need to check for containerToken (authorization).
631      * Here we will be assuming that we have valid NMToken
632      * 1) ContainerToken used is expired.
633      * 2) ContainerToken is tampered (resource is modified).
634      */
635     NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
636         yarnCluster.getResourceManager().getRMContext()
637           .getNMTokenSecretManager();
638     ApplicationId appId = ApplicationId.newInstance(1, 1);
639     ApplicationAttemptId appAttemptId =
640         ApplicationAttemptId.newInstance(appId, 0);
641     ContainerId cId = ContainerId.newContainerId(appAttemptId, 0);
642     NodeManager nm = yarnCluster.getNodeManager(0);
643     NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
644         nm.getNMContext().getNMTokenSecretManager();
645     String user = "test";
646 
647     waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
648 
649     NodeId nodeId = nm.getNMContext().getNodeId();
650 
651     // Both id should be equal.
652     Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
653         nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
654 
655 
656     RMContainerTokenSecretManager containerTokenSecretManager =
657         yarnCluster.getResourceManager().getRMContext().
658             getContainerTokenSecretManager();
659 
660     Resource r = Resource.newInstance(1230, 2);
661 
662     Token containerToken =
663         containerTokenSecretManager.createContainerToken(
664             cId, nodeId, user, r, Priority.newInstance(0), 0);
665 
666     ContainerTokenIdentifier containerTokenIdentifier =
667         getContainerTokenIdentifierFromToken(containerToken);
668 
669     // Verify new compatible version ContainerTokenIdentifier can work successfully.
670     ContainerTokenIdentifierForTest newVersionTokenIdentifier =
671         new ContainerTokenIdentifierForTest(containerTokenIdentifier, "message");
672     byte[] password =
673         containerTokenSecretManager.createPassword(newVersionTokenIdentifier);
674 
675     Token newContainerToken = BuilderUtils.newContainerToken(
676         nodeId, password, newVersionTokenIdentifier);
677 
678     Token nmToken =
679             nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
680     YarnRPC rpc = YarnRPC.create(conf);
681     Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
682         newContainerToken, nmToken, false).isEmpty());
683 
684     // Creating a tampered Container Token
685     RMContainerTokenSecretManager tamperedContainerTokenSecretManager =
686         new RMContainerTokenSecretManager(conf);
687     tamperedContainerTokenSecretManager.rollMasterKey();
688     do {
689       tamperedContainerTokenSecretManager.rollMasterKey();
690       tamperedContainerTokenSecretManager.activateNextMasterKey();
691     } while (containerTokenSecretManager.getCurrentKey().getKeyId()
692         == tamperedContainerTokenSecretManager.getCurrentKey().getKeyId());
693 
694     ContainerId cId2 = ContainerId.newContainerId(appAttemptId, 1);
695     // Creating modified containerToken
696     Token containerToken2 =
697         tamperedContainerTokenSecretManager.createContainerToken(cId2, nodeId,
698             user, r, Priority.newInstance(0), 0);
699 
700     StringBuilder sb = new StringBuilder("Given Container ");
701     sb.append(cId2);
702     sb.append(" seems to have an illegally generated token.");
703     Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
704         containerToken2, nmToken, true).contains(sb.toString()));
705   }
706 
getContainerTokenIdentifierFromToken( Token containerToken)707   private ContainerTokenIdentifier getContainerTokenIdentifierFromToken(
708       Token containerToken) throws IOException {
709     ContainerTokenIdentifier containerTokenIdentifier;
710     containerTokenIdentifier = new ContainerTokenIdentifier();
711     byte[] tokenIdentifierContent = containerToken.getIdentifier().array();
712     DataInputBuffer dib = new DataInputBuffer();
713     dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
714     containerTokenIdentifier.readFields(dib);
715     return containerTokenIdentifier;
716   }
717 
718   /**
719    * This tests whether a containerId is serialized/deserialized with epoch.
720    *
721    * @throws IOException
722    * @throws InterruptedException
723    * @throws YarnException
724    */
testContainerTokenWithEpoch(Configuration conf)725   private void testContainerTokenWithEpoch(Configuration conf)
726       throws IOException, InterruptedException, YarnException {
727 
728     LOG.info("Running test for serializing/deserializing containerIds");
729 
730     NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
731         yarnCluster.getResourceManager().getRMContext()
732             .getNMTokenSecretManager();
733     ApplicationId appId = ApplicationId.newInstance(1, 1);
734     ApplicationAttemptId appAttemptId =
735         ApplicationAttemptId.newInstance(appId, 0);
736     ContainerId cId = ContainerId.newContainerId(appAttemptId, (5L << 40) | 3L);
737     NodeManager nm = yarnCluster.getNodeManager(0);
738     NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
739         nm.getNMContext().getNMTokenSecretManager();
740     String user = "test";
741 
742     waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
743 
744     NodeId nodeId = nm.getNMContext().getNodeId();
745 
746     // Both id should be equal.
747     Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
748         nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
749 
750     // Creating a normal Container Token
751     RMContainerTokenSecretManager containerTokenSecretManager =
752         yarnCluster.getResourceManager().getRMContext().
753             getContainerTokenSecretManager();
754     Resource r = Resource.newInstance(1230, 2);
755     Token containerToken =
756         containerTokenSecretManager.createContainerToken(cId, nodeId, user, r,
757             Priority.newInstance(0), 0);
758 
759     ContainerTokenIdentifier containerTokenIdentifier =
760         new ContainerTokenIdentifier();
761     byte[] tokenIdentifierContent = containerToken.getIdentifier().array();
762     DataInputBuffer dib = new DataInputBuffer();
763     dib.reset(tokenIdentifierContent, tokenIdentifierContent.length);
764     containerTokenIdentifier.readFields(dib);
765 
766 
767     Assert.assertEquals(cId, containerTokenIdentifier.getContainerID());
768     Assert.assertEquals(
769         cId.toString(), containerTokenIdentifier.getContainerID().toString());
770 
771     Token nmToken =
772         nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
773 
774     YarnRPC rpc = YarnRPC.create(conf);
775     testStartContainer(rpc, appAttemptId, nodeId, containerToken, nmToken,
776         false);
777 
778     List<ContainerId> containerIds = new LinkedList<ContainerId>();
779     containerIds.add(cId);
780     ContainerManagementProtocol proxy
781         = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
782     GetContainerStatusesResponse res = proxy.getContainerStatuses(
783         GetContainerStatusesRequest.newInstance(containerIds));
784     Assert.assertNotNull(res.getContainerStatuses().get(0));
785     Assert.assertEquals(
786         cId, res.getContainerStatuses().get(0).getContainerId());
787     Assert.assertEquals(cId.toString(),
788         res.getContainerStatuses().get(0).getContainerId().toString());
789   }
790 }
791