1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.security;
20 
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23 
24 import java.io.IOException;
25 import java.net.InetSocketAddress;
26 import java.security.PrivilegedAction;
27 import java.security.PrivilegedExceptionAction;
28 
29 import org.junit.Assert;
30 
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
35 import org.apache.hadoop.mapred.JobConf;
36 import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
37 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
38 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
39 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
40 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
41 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
42 import org.apache.hadoop.mapreduce.v2.hs.HistoryClientService;
43 import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService;
44 import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager;
45 import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
46 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
47 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
48 import org.apache.hadoop.security.UserGroupInformation;
49 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
50 import org.apache.hadoop.yarn.api.records.Token;
51 import org.apache.hadoop.yarn.conf.YarnConfiguration;
52 import org.apache.hadoop.yarn.ipc.YarnRPC;
53 import org.apache.hadoop.yarn.util.ConverterUtils;
54 import org.apache.hadoop.yarn.util.Records;
55 import org.apache.log4j.Level;
56 import org.apache.log4j.LogManager;
57 import org.apache.log4j.Logger;
58 import org.junit.Test;
59 
60 public class TestJHSSecurity {
61 
62   private static final Log LOG = LogFactory.getLog(TestJHSSecurity.class);
63 
64   @Test
testDelegationToken()65   public void testDelegationToken() throws IOException, InterruptedException {
66 
67     Logger rootLogger = LogManager.getRootLogger();
68     rootLogger.setLevel(Level.DEBUG);
69 
70     final YarnConfiguration conf = new YarnConfiguration(new JobConf());
71     // Just a random principle
72     conf.set(JHAdminConfig.MR_HISTORY_PRINCIPAL,
73       "RandomOrc/localhost@apache.org");
74 
75     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
76       "kerberos");
77     UserGroupInformation.setConfiguration(conf);
78 
79     final long initialInterval = 10000l;
80     final long maxLifetime= 20000l;
81     final long renewInterval = 10000l;
82 
83     JobHistoryServer jobHistoryServer = null;
84     MRClientProtocol clientUsingDT = null;
85     long tokenFetchTime;
86     try {
87       jobHistoryServer = new JobHistoryServer() {
88         protected void doSecureLogin(Configuration conf) throws IOException {
89           // no keytab based login
90         };
91 
92         @Override
93         protected JHSDelegationTokenSecretManager createJHSSecretManager(
94             Configuration conf, HistoryServerStateStoreService store) {
95           return new JHSDelegationTokenSecretManager(initialInterval,
96               maxLifetime, renewInterval, 3600000, store);
97         }
98 
99         @Override
100         protected HistoryClientService createHistoryClientService() {
101           return new HistoryClientService(historyContext,
102             this.jhsDTSecretManager) {
103             @Override
104             protected void initializeWebApp(Configuration conf) {
105               // Don't need it, skip.;
106               }
107           };
108         }
109       };
110 //      final JobHistoryServer jobHistoryServer = jhServer;
111       jobHistoryServer.init(conf);
112       jobHistoryServer.start();
113       final MRClientProtocol hsService = jobHistoryServer.getClientService()
114           .getClientHandler();
115 
116       // Fake the authentication-method
117       UserGroupInformation loggedInUser = UserGroupInformation
118           .createRemoteUser("testrenewer@APACHE.ORG");
119       Assert.assertEquals("testrenewer", loggedInUser.getShortUserName());
120    // Default realm is APACHE.ORG
121       loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
122 
123 
124       Token token = getDelegationToken(loggedInUser, hsService,
125           loggedInUser.getShortUserName());
126       tokenFetchTime = System.currentTimeMillis();
127       LOG.info("Got delegation token at: " + tokenFetchTime);
128 
129       // Now try talking to JHS using the delegation token
130       clientUsingDT = getMRClientProtocol(token, jobHistoryServer
131           .getClientService().getBindAddress(), "TheDarkLord", conf);
132 
133       GetJobReportRequest jobReportRequest =
134           Records.newRecord(GetJobReportRequest.class);
135       jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1));
136       try {
137         clientUsingDT.getJobReport(jobReportRequest);
138       } catch (IOException e) {
139         Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
140       }
141 
142    // Renew after 50% of token age.
143       while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) {
144         Thread.sleep(500l);
145       }
146       long nextExpTime = renewDelegationToken(loggedInUser, hsService, token);
147       long renewalTime = System.currentTimeMillis();
148       LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: "
149           + nextExpTime);
150 
151       // Wait for first expiry, but before renewed expiry.
152       while (System.currentTimeMillis() > tokenFetchTime + initialInterval
153           && System.currentTimeMillis() < nextExpTime) {
154         Thread.sleep(500l);
155       }
156       Thread.sleep(50l);
157 
158       // Valid token because of renewal.
159       try {
160         clientUsingDT.getJobReport(jobReportRequest);
161       } catch (IOException e) {
162         Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
163       }
164 
165       // Wait for expiry.
166       while(System.currentTimeMillis() < renewalTime + renewInterval) {
167         Thread.sleep(500l);
168       }
169       Thread.sleep(50l);
170       LOG.info("At time: " + System.currentTimeMillis() + ", token should be invalid");
171       // Token should have expired.
172       try {
173         clientUsingDT.getJobReport(jobReportRequest);
174         fail("Should not have succeeded with an expired token");
175       } catch (IOException e) {
176         assertTrue(e.getCause().getMessage().contains("is expired"));
177       }
178 
179       // Test cancellation
180       // Stop the existing proxy, start another.
181       if (clientUsingDT != null) {
182 //        RPC.stopProxy(clientUsingDT);
183         clientUsingDT = null;
184       }
185       token = getDelegationToken(loggedInUser, hsService,
186           loggedInUser.getShortUserName());
187       tokenFetchTime = System.currentTimeMillis();
188       LOG.info("Got delegation token at: " + tokenFetchTime);
189 
190       // Now try talking to HSService using the delegation token
191       clientUsingDT = getMRClientProtocol(token, jobHistoryServer
192           .getClientService().getBindAddress(), "loginuser2", conf);
193 
194 
195       try {
196         clientUsingDT.getJobReport(jobReportRequest);
197       } catch (IOException e) {
198         fail("Unexpected exception" + e);
199       }
200       cancelDelegationToken(loggedInUser, hsService, token);
201 
202       // Testing the token with different renewer to cancel the token
203       Token tokenWithDifferentRenewer = getDelegationToken(loggedInUser,
204           hsService, "yarn");
205       cancelDelegationToken(loggedInUser, hsService, tokenWithDifferentRenewer);
206       if (clientUsingDT != null) {
207 //        RPC.stopProxy(clientUsingDT);
208         clientUsingDT = null;
209       }
210 
211       // Creating a new connection.
212       clientUsingDT = getMRClientProtocol(token, jobHistoryServer
213           .getClientService().getBindAddress(), "loginuser2", conf);
214       LOG.info("Cancelled delegation token at: " + System.currentTimeMillis());
215       // Verify cancellation worked.
216       try {
217         clientUsingDT.getJobReport(jobReportRequest);
218         fail("Should not have succeeded with a cancelled delegation token");
219       } catch (IOException e) {
220       }
221 
222 
223 
224     } finally {
225       jobHistoryServer.stop();
226     }
227   }
228 
getDelegationToken( final UserGroupInformation loggedInUser, final MRClientProtocol hsService, final String renewerString)229   private Token getDelegationToken(
230       final UserGroupInformation loggedInUser,
231       final MRClientProtocol hsService, final String renewerString)
232       throws IOException, InterruptedException {
233     // Get the delegation token directly as it is a little difficult to setup
234     // the kerberos based rpc.
235     Token token = loggedInUser
236         .doAs(new PrivilegedExceptionAction<Token>() {
237           @Override
238           public Token run() throws IOException {
239             GetDelegationTokenRequest request = Records
240                 .newRecord(GetDelegationTokenRequest.class);
241             request.setRenewer(renewerString);
242             return hsService.getDelegationToken(request).getDelegationToken();
243           }
244 
245         });
246     return token;
247   }
248 
renewDelegationToken(final UserGroupInformation loggedInUser, final MRClientProtocol hsService, final Token dToken)249   private long renewDelegationToken(final UserGroupInformation loggedInUser,
250       final MRClientProtocol hsService, final Token dToken)
251       throws IOException, InterruptedException {
252     long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
253 
254       @Override
255       public Long run() throws IOException {
256         RenewDelegationTokenRequest request = Records
257             .newRecord(RenewDelegationTokenRequest.class);
258         request.setDelegationToken(dToken);
259         return hsService.renewDelegationToken(request).getNextExpirationTime();
260       }
261     });
262     return nextExpTime;
263   }
264 
cancelDelegationToken(final UserGroupInformation loggedInUser, final MRClientProtocol hsService, final Token dToken)265   private void cancelDelegationToken(final UserGroupInformation loggedInUser,
266       final MRClientProtocol hsService, final Token dToken)
267       throws IOException, InterruptedException {
268 
269     loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
270       @Override
271       public Void run() throws IOException {
272         CancelDelegationTokenRequest request = Records
273             .newRecord(CancelDelegationTokenRequest.class);
274         request.setDelegationToken(dToken);
275         hsService.cancelDelegationToken(request);
276         return null;
277       }
278     });
279   }
280 
getMRClientProtocol(Token token, final InetSocketAddress hsAddress, String user, final Configuration conf)281   private MRClientProtocol getMRClientProtocol(Token token,
282       final InetSocketAddress hsAddress, String user, final Configuration conf) {
283     UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
284     ugi.addToken(ConverterUtils.convertFromYarn(token, hsAddress));
285 
286     final YarnRPC rpc = YarnRPC.create(conf);
287     MRClientProtocol hsWithDT = ugi
288         .doAs(new PrivilegedAction<MRClientProtocol>() {
289 
290           @Override
291           public MRClientProtocol run() {
292             return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
293                 hsAddress, conf);
294           }
295         });
296     return hsWithDT;
297   }
298 
299 }
300