1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.security; 20 21 import static org.junit.Assert.assertTrue; 22 import static org.junit.Assert.fail; 23 24 import java.io.IOException; 25 import java.net.InetSocketAddress; 26 import java.security.PrivilegedAction; 27 import java.security.PrivilegedExceptionAction; 28 29 import org.junit.Assert; 30 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 35 import org.apache.hadoop.mapred.JobConf; 36 import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; 37 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; 38 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest; 39 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; 40 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; 41 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest; 42 import org.apache.hadoop.mapreduce.v2.hs.HistoryClientService; 43 import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService; 44 import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager; 45 import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer; 46 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 47 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; 48 import org.apache.hadoop.security.UserGroupInformation; 49 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 50 import org.apache.hadoop.yarn.api.records.Token; 51 import org.apache.hadoop.yarn.conf.YarnConfiguration; 52 import org.apache.hadoop.yarn.ipc.YarnRPC; 53 import org.apache.hadoop.yarn.util.ConverterUtils; 54 import org.apache.hadoop.yarn.util.Records; 55 import org.apache.log4j.Level; 56 import org.apache.log4j.LogManager; 57 import org.apache.log4j.Logger; 58 import org.junit.Test; 59 60 public class TestJHSSecurity { 61 62 private static final Log LOG = LogFactory.getLog(TestJHSSecurity.class); 63 64 @Test testDelegationToken()65 public void testDelegationToken() throws IOException, InterruptedException { 66 67 Logger rootLogger = LogManager.getRootLogger(); 68 rootLogger.setLevel(Level.DEBUG); 69 70 final YarnConfiguration conf = new YarnConfiguration(new JobConf()); 71 // Just a random principle 72 conf.set(JHAdminConfig.MR_HISTORY_PRINCIPAL, 73 "RandomOrc/localhost@apache.org"); 74 75 conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 76 "kerberos"); 77 UserGroupInformation.setConfiguration(conf); 78 79 final long initialInterval = 10000l; 80 final long maxLifetime= 20000l; 81 final long renewInterval = 10000l; 82 83 JobHistoryServer jobHistoryServer = null; 84 MRClientProtocol clientUsingDT = null; 85 long tokenFetchTime; 86 try { 87 jobHistoryServer = new JobHistoryServer() { 88 protected void doSecureLogin(Configuration conf) throws IOException { 89 // no keytab based login 90 }; 91 92 @Override 93 protected JHSDelegationTokenSecretManager createJHSSecretManager( 94 Configuration conf, HistoryServerStateStoreService store) { 95 return new JHSDelegationTokenSecretManager(initialInterval, 96 maxLifetime, renewInterval, 3600000, store); 97 } 98 99 @Override 100 protected HistoryClientService createHistoryClientService() { 101 return new HistoryClientService(historyContext, 102 this.jhsDTSecretManager) { 103 @Override 104 protected void initializeWebApp(Configuration conf) { 105 // Don't need it, skip.; 106 } 107 }; 108 } 109 }; 110 // final JobHistoryServer jobHistoryServer = jhServer; 111 jobHistoryServer.init(conf); 112 jobHistoryServer.start(); 113 final MRClientProtocol hsService = jobHistoryServer.getClientService() 114 .getClientHandler(); 115 116 // Fake the authentication-method 117 UserGroupInformation loggedInUser = UserGroupInformation 118 .createRemoteUser("testrenewer@APACHE.ORG"); 119 Assert.assertEquals("testrenewer", loggedInUser.getShortUserName()); 120 // Default realm is APACHE.ORG 121 loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS); 122 123 124 Token token = getDelegationToken(loggedInUser, hsService, 125 loggedInUser.getShortUserName()); 126 tokenFetchTime = System.currentTimeMillis(); 127 LOG.info("Got delegation token at: " + tokenFetchTime); 128 129 // Now try talking to JHS using the delegation token 130 clientUsingDT = getMRClientProtocol(token, jobHistoryServer 131 .getClientService().getBindAddress(), "TheDarkLord", conf); 132 133 GetJobReportRequest jobReportRequest = 134 Records.newRecord(GetJobReportRequest.class); 135 jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1)); 136 try { 137 clientUsingDT.getJobReport(jobReportRequest); 138 } catch (IOException e) { 139 Assert.assertEquals("Unknown job job_123456_0001", e.getMessage()); 140 } 141 142 // Renew after 50% of token age. 143 while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) { 144 Thread.sleep(500l); 145 } 146 long nextExpTime = renewDelegationToken(loggedInUser, hsService, token); 147 long renewalTime = System.currentTimeMillis(); 148 LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: " 149 + nextExpTime); 150 151 // Wait for first expiry, but before renewed expiry. 152 while (System.currentTimeMillis() > tokenFetchTime + initialInterval 153 && System.currentTimeMillis() < nextExpTime) { 154 Thread.sleep(500l); 155 } 156 Thread.sleep(50l); 157 158 // Valid token because of renewal. 159 try { 160 clientUsingDT.getJobReport(jobReportRequest); 161 } catch (IOException e) { 162 Assert.assertEquals("Unknown job job_123456_0001", e.getMessage()); 163 } 164 165 // Wait for expiry. 166 while(System.currentTimeMillis() < renewalTime + renewInterval) { 167 Thread.sleep(500l); 168 } 169 Thread.sleep(50l); 170 LOG.info("At time: " + System.currentTimeMillis() + ", token should be invalid"); 171 // Token should have expired. 172 try { 173 clientUsingDT.getJobReport(jobReportRequest); 174 fail("Should not have succeeded with an expired token"); 175 } catch (IOException e) { 176 assertTrue(e.getCause().getMessage().contains("is expired")); 177 } 178 179 // Test cancellation 180 // Stop the existing proxy, start another. 181 if (clientUsingDT != null) { 182 // RPC.stopProxy(clientUsingDT); 183 clientUsingDT = null; 184 } 185 token = getDelegationToken(loggedInUser, hsService, 186 loggedInUser.getShortUserName()); 187 tokenFetchTime = System.currentTimeMillis(); 188 LOG.info("Got delegation token at: " + tokenFetchTime); 189 190 // Now try talking to HSService using the delegation token 191 clientUsingDT = getMRClientProtocol(token, jobHistoryServer 192 .getClientService().getBindAddress(), "loginuser2", conf); 193 194 195 try { 196 clientUsingDT.getJobReport(jobReportRequest); 197 } catch (IOException e) { 198 fail("Unexpected exception" + e); 199 } 200 cancelDelegationToken(loggedInUser, hsService, token); 201 202 // Testing the token with different renewer to cancel the token 203 Token tokenWithDifferentRenewer = getDelegationToken(loggedInUser, 204 hsService, "yarn"); 205 cancelDelegationToken(loggedInUser, hsService, tokenWithDifferentRenewer); 206 if (clientUsingDT != null) { 207 // RPC.stopProxy(clientUsingDT); 208 clientUsingDT = null; 209 } 210 211 // Creating a new connection. 212 clientUsingDT = getMRClientProtocol(token, jobHistoryServer 213 .getClientService().getBindAddress(), "loginuser2", conf); 214 LOG.info("Cancelled delegation token at: " + System.currentTimeMillis()); 215 // Verify cancellation worked. 216 try { 217 clientUsingDT.getJobReport(jobReportRequest); 218 fail("Should not have succeeded with a cancelled delegation token"); 219 } catch (IOException e) { 220 } 221 222 223 224 } finally { 225 jobHistoryServer.stop(); 226 } 227 } 228 getDelegationToken( final UserGroupInformation loggedInUser, final MRClientProtocol hsService, final String renewerString)229 private Token getDelegationToken( 230 final UserGroupInformation loggedInUser, 231 final MRClientProtocol hsService, final String renewerString) 232 throws IOException, InterruptedException { 233 // Get the delegation token directly as it is a little difficult to setup 234 // the kerberos based rpc. 235 Token token = loggedInUser 236 .doAs(new PrivilegedExceptionAction<Token>() { 237 @Override 238 public Token run() throws IOException { 239 GetDelegationTokenRequest request = Records 240 .newRecord(GetDelegationTokenRequest.class); 241 request.setRenewer(renewerString); 242 return hsService.getDelegationToken(request).getDelegationToken(); 243 } 244 245 }); 246 return token; 247 } 248 renewDelegationToken(final UserGroupInformation loggedInUser, final MRClientProtocol hsService, final Token dToken)249 private long renewDelegationToken(final UserGroupInformation loggedInUser, 250 final MRClientProtocol hsService, final Token dToken) 251 throws IOException, InterruptedException { 252 long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() { 253 254 @Override 255 public Long run() throws IOException { 256 RenewDelegationTokenRequest request = Records 257 .newRecord(RenewDelegationTokenRequest.class); 258 request.setDelegationToken(dToken); 259 return hsService.renewDelegationToken(request).getNextExpirationTime(); 260 } 261 }); 262 return nextExpTime; 263 } 264 cancelDelegationToken(final UserGroupInformation loggedInUser, final MRClientProtocol hsService, final Token dToken)265 private void cancelDelegationToken(final UserGroupInformation loggedInUser, 266 final MRClientProtocol hsService, final Token dToken) 267 throws IOException, InterruptedException { 268 269 loggedInUser.doAs(new PrivilegedExceptionAction<Void>() { 270 @Override 271 public Void run() throws IOException { 272 CancelDelegationTokenRequest request = Records 273 .newRecord(CancelDelegationTokenRequest.class); 274 request.setDelegationToken(dToken); 275 hsService.cancelDelegationToken(request); 276 return null; 277 } 278 }); 279 } 280 getMRClientProtocol(Token token, final InetSocketAddress hsAddress, String user, final Configuration conf)281 private MRClientProtocol getMRClientProtocol(Token token, 282 final InetSocketAddress hsAddress, String user, final Configuration conf) { 283 UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); 284 ugi.addToken(ConverterUtils.convertFromYarn(token, hsAddress)); 285 286 final YarnRPC rpc = YarnRPC.create(conf); 287 MRClientProtocol hsWithDT = ugi 288 .doAs(new PrivilegedAction<MRClientProtocol>() { 289 290 @Override 291 public MRClientProtocol run() { 292 return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class, 293 hsAddress, conf); 294 } 295 }); 296 return hsWithDT; 297 } 298 299 } 300