1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.security.token; 20 21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; 22 import static org.junit.Assert.assertEquals; 23 import static org.junit.Assert.assertFalse; 24 import static org.junit.Assert.assertNotNull; 25 import static org.junit.Assert.assertTrue; 26 27 import java.io.IOException; 28 import java.net.InetSocketAddress; 29 import java.security.PrivilegedExceptionAction; 30 import java.util.ArrayList; 31 import java.util.List; 32 import java.util.concurrent.ConcurrentMap; 33 import java.util.concurrent.ExecutorService; 34 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 import org.apache.hadoop.conf.Configuration; 38 import org.apache.hadoop.hbase.ChoreService; 39 import org.apache.hadoop.hbase.ClusterId; 40 import org.apache.hadoop.hbase.CoordinatedStateManager; 41 import org.apache.hadoop.hbase.Coprocessor; 42 import org.apache.hadoop.hbase.HBaseTestingUtility; 43 import org.apache.hadoop.hbase.HConstants; 44 import org.apache.hadoop.hbase.HRegionInfo; 45 import org.apache.hadoop.hbase.Server; 46 import org.apache.hadoop.hbase.ServerName; 47 import org.apache.hadoop.hbase.TableName; 48 import org.apache.hadoop.hbase.client.ClusterConnection; 49 import org.apache.hadoop.hbase.client.Connection; 50 import org.apache.hadoop.hbase.client.ConnectionFactory; 51 import org.apache.hadoop.hbase.client.HTableInterface; 52 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 53 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; 54 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; 55 import org.apache.hadoop.hbase.ipc.RpcClient; 56 import org.apache.hadoop.hbase.ipc.RpcClientFactory; 57 import org.apache.hadoop.hbase.ipc.RpcServer; 58 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 59 import org.apache.hadoop.hbase.ipc.RpcServerInterface; 60 import org.apache.hadoop.hbase.ipc.ServerRpcController; 61 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; 62 import org.apache.hadoop.hbase.regionserver.HRegion; 63 import org.apache.hadoop.hbase.regionserver.RegionServerServices; 64 import org.apache.hadoop.hbase.security.SecurityInfo; 65 import org.apache.hadoop.hbase.security.User; 66 import org.apache.hadoop.hbase.testclassification.MediumTests; 67 import org.apache.hadoop.hbase.util.Bytes; 68 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 69 import org.apache.hadoop.hbase.util.Sleeper; 70 import org.apache.hadoop.hbase.util.Strings; 71 import org.apache.hadoop.hbase.util.Threads; 72 import org.apache.hadoop.hbase.util.Writables; 73 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 74 import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 75 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; 76 import org.apache.hadoop.net.DNS; 77 import org.apache.hadoop.security.UserGroupInformation; 78 import org.apache.hadoop.security.authorize.PolicyProvider; 79 import org.apache.hadoop.security.authorize.Service; 80 import org.apache.hadoop.security.token.SecretManager; 81 import org.apache.hadoop.security.token.Token; 82 import org.apache.hadoop.security.token.TokenIdentifier; 83 import org.junit.AfterClass; 84 import org.junit.BeforeClass; 85 import org.junit.Test; 86 import org.junit.experimental.categories.Category; 87 88 import com.google.protobuf.BlockingRpcChannel; 89 import com.google.protobuf.BlockingService; 90 import com.google.protobuf.RpcController; 91 import com.google.protobuf.ServiceException; 92 93 /** 94 * Tests for authentication token creation and usage 95 */ 96 @Category(MediumTests.class) 97 public class TestTokenAuthentication { 98 static { 99 // Setting whatever system properties after recommendation from 100 // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html 101 System.setProperty("java.security.krb5.realm", "hbase"); 102 System.setProperty("java.security.krb5.kdc", "blah"); 103 } 104 private static final Log LOG = LogFactory.getLog(TestTokenAuthentication.class); 105 106 public interface AuthenticationServiceSecurityInfo {} 107 108 /** 109 * Basic server process for RPC authentication testing 110 */ 111 private static class TokenServer extends TokenProvider 112 implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server { 113 private static final Log LOG = LogFactory.getLog(TokenServer.class); 114 private Configuration conf; 115 private RpcServerInterface rpcServer; 116 private InetSocketAddress isa; 117 private ZooKeeperWatcher zookeeper; 118 private Sleeper sleeper; 119 private boolean started = false; 120 private boolean aborted = false; 121 private boolean stopped = false; 122 private long startcode; 123 TokenServer(Configuration conf)124 public TokenServer(Configuration conf) throws IOException { 125 this.conf = conf; 126 this.startcode = EnvironmentEdgeManager.currentTime(); 127 // Server to handle client requests. 128 String hostname = 129 Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default")); 130 int port = 0; 131 // Creation of an ISA will force a resolve. 132 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); 133 if (initialIsa.getAddress() == null) { 134 throw new IllegalArgumentException("Failed resolve of " + initialIsa); 135 } 136 final List<BlockingServiceAndInterface> sai = 137 new ArrayList<BlockingServiceAndInterface>(1); 138 BlockingService service = 139 AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this); 140 sai.add(new BlockingServiceAndInterface(service, 141 AuthenticationProtos.AuthenticationService.BlockingInterface.class)); 142 this.rpcServer = 143 new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1)); 144 InetSocketAddress address = rpcServer.getListenerAddress(); 145 if (address == null) { 146 throw new IOException("Listener channel is closed"); 147 } 148 this.isa = address; 149 this.sleeper = new Sleeper(1000, this); 150 } 151 152 @Override getConfiguration()153 public Configuration getConfiguration() { 154 return conf; 155 } 156 157 @Override getConnection()158 public ClusterConnection getConnection() { 159 return null; 160 } 161 162 @Override getMetaTableLocator()163 public MetaTableLocator getMetaTableLocator() { 164 return null; 165 } 166 167 @Override getZooKeeper()168 public ZooKeeperWatcher getZooKeeper() { 169 return zookeeper; 170 } 171 172 @Override getCoordinatedStateManager()173 public CoordinatedStateManager getCoordinatedStateManager() { 174 return null; 175 } 176 177 @Override isAborted()178 public boolean isAborted() { 179 return aborted; 180 } 181 182 @Override getServerName()183 public ServerName getServerName() { 184 return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode); 185 } 186 187 @Override abort(String reason, Throwable error)188 public void abort(String reason, Throwable error) { 189 LOG.fatal("Aborting on: "+reason, error); 190 this.aborted = true; 191 this.stopped = true; 192 sleeper.skipSleepCycle(); 193 } 194 initialize()195 private void initialize() throws IOException { 196 // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth 197 Configuration zkConf = new Configuration(conf); 198 zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); 199 this.zookeeper = new ZooKeeperWatcher(zkConf, TokenServer.class.getSimpleName(), 200 this, true); 201 this.rpcServer.start(); 202 203 // mock RegionServerServices to provide to coprocessor environment 204 final RegionServerServices mockServices = TEST_UTIL.createMockRegionServerService(rpcServer); 205 206 // mock up coprocessor environment 207 super.start(new RegionCoprocessorEnvironment() { 208 @Override 209 public HRegion getRegion() { return null; } 210 211 @Override 212 public RegionServerServices getRegionServerServices() { 213 return mockServices; 214 } 215 216 @Override 217 public ConcurrentMap<String, Object> getSharedData() { return null; } 218 219 @Override 220 public int getVersion() { return 0; } 221 222 @Override 223 public String getHBaseVersion() { return null; } 224 225 @Override 226 public Coprocessor getInstance() { return null; } 227 228 @Override 229 public int getPriority() { return 0; } 230 231 @Override 232 public int getLoadSequence() { return 0; } 233 234 @Override 235 public Configuration getConfiguration() { return conf; } 236 237 @Override 238 public HTableInterface getTable(TableName tableName) throws IOException 239 { return null; } 240 241 @Override 242 public HTableInterface getTable(TableName tableName, ExecutorService service) 243 throws IOException { 244 return null; 245 } 246 247 @Override 248 public ClassLoader getClassLoader() { 249 return Thread.currentThread().getContextClassLoader(); 250 } 251 252 @Override 253 public HRegionInfo getRegionInfo() { 254 return null; 255 } 256 }); 257 258 started = true; 259 } 260 run()261 public void run() { 262 try { 263 initialize(); 264 while (!stopped) { 265 this.sleeper.sleep(); 266 } 267 } catch (Exception e) { 268 abort(e.getMessage(), e); 269 } 270 this.rpcServer.stop(); 271 } 272 isStarted()273 public boolean isStarted() { 274 return started; 275 } 276 277 @Override stop(String reason)278 public void stop(String reason) { 279 LOG.info("Stopping due to: "+reason); 280 this.stopped = true; 281 sleeper.skipSleepCycle(); 282 } 283 284 @Override isStopped()285 public boolean isStopped() { 286 return stopped; 287 } 288 getAddress()289 public InetSocketAddress getAddress() { 290 return isa; 291 } 292 getSecretManager()293 public SecretManager<? extends TokenIdentifier> getSecretManager() { 294 return ((RpcServer)rpcServer).getSecretManager(); 295 } 296 297 @Override getAuthenticationToken( RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)298 public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken( 299 RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request) 300 throws ServiceException { 301 LOG.debug("Authentication token request from " + RpcServer.getRequestUserName()); 302 // ignore passed in controller -- it's always null 303 ServerRpcController serverController = new ServerRpcController(); 304 BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback = 305 new BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>(); 306 getAuthenticationToken(serverController, request, callback); 307 try { 308 serverController.checkFailed(); 309 return callback.get(); 310 } catch (IOException ioe) { 311 throw new ServiceException(ioe); 312 } 313 } 314 315 @Override whoAmI( RpcController controller, AuthenticationProtos.WhoAmIRequest request)316 public AuthenticationProtos.WhoAmIResponse whoAmI( 317 RpcController controller, AuthenticationProtos.WhoAmIRequest request) 318 throws ServiceException { 319 LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName()); 320 // ignore passed in controller -- it's always null 321 ServerRpcController serverController = new ServerRpcController(); 322 BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback = 323 new BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse>(); 324 whoAmI(serverController, request, callback); 325 try { 326 serverController.checkFailed(); 327 return callback.get(); 328 } catch (IOException ioe) { 329 throw new ServiceException(ioe); 330 } 331 } 332 333 @Override getChoreService()334 public ChoreService getChoreService() { 335 return null; 336 } 337 } 338 339 private static HBaseTestingUtility TEST_UTIL; 340 private static TokenServer server; 341 private static Thread serverThread; 342 private static AuthenticationTokenSecretManager secretManager; 343 private static ClusterId clusterId = new ClusterId(); 344 345 @BeforeClass setupBeforeClass()346 public static void setupBeforeClass() throws Exception { 347 TEST_UTIL = new HBaseTestingUtility(); 348 TEST_UTIL.startMiniZKCluster(); 349 // register token type for protocol 350 SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(), 351 new SecurityInfo("hbase.test.kerberos.principal", 352 AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN)); 353 // security settings only added after startup so that ZK does not require SASL 354 Configuration conf = TEST_UTIL.getConfiguration(); 355 conf.set("hadoop.security.authentication", "kerberos"); 356 conf.set("hbase.security.authentication", "kerberos"); 357 conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true); 358 server = new TokenServer(conf); 359 serverThread = new Thread(server); 360 Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString()); 361 // wait for startup 362 while (!server.isStarted() && !server.isStopped()) { 363 Thread.sleep(10); 364 } 365 server.rpcServer.refreshAuthManager(new PolicyProvider() { 366 @Override 367 public Service[] getServices() { 368 return new Service [] { 369 new Service("security.client.protocol.acl", 370 AuthenticationProtos.AuthenticationService.BlockingInterface.class)}; 371 } 372 }); 373 ZKClusterId.setClusterId(server.getZooKeeper(), clusterId); 374 secretManager = (AuthenticationTokenSecretManager)server.getSecretManager(); 375 while(secretManager.getCurrentKey() == null) { 376 Thread.sleep(1); 377 } 378 } 379 380 @AfterClass tearDownAfterClass()381 public static void tearDownAfterClass() throws Exception { 382 server.stop("Test complete"); 383 Threads.shutdown(serverThread); 384 TEST_UTIL.shutdownMiniZKCluster(); 385 } 386 387 @Test testTokenCreation()388 public void testTokenCreation() throws Exception { 389 Token<AuthenticationTokenIdentifier> token = 390 secretManager.generateToken("testuser"); 391 392 AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier(); 393 Writables.getWritable(token.getIdentifier(), ident); 394 assertEquals("Token username should match", "testuser", 395 ident.getUsername()); 396 byte[] passwd = secretManager.retrievePassword(ident); 397 assertTrue("Token password and password from secret manager should match", 398 Bytes.equals(token.getPassword(), passwd)); 399 } 400 401 @Test testTokenAuthentication()402 public void testTokenAuthentication() throws Exception { 403 UserGroupInformation testuser = 404 UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"}); 405 406 testuser.setAuthenticationMethod( 407 UserGroupInformation.AuthenticationMethod.TOKEN); 408 final Configuration conf = TEST_UTIL.getConfiguration(); 409 UserGroupInformation.setConfiguration(conf); 410 Token<AuthenticationTokenIdentifier> token = 411 secretManager.generateToken("testuser"); 412 LOG.debug("Got token: " + token.toString()); 413 testuser.addToken(token); 414 415 // verify the server authenticates us as this token user 416 testuser.doAs(new PrivilegedExceptionAction<Object>() { 417 public Object run() throws Exception { 418 Configuration c = server.getConfiguration(); 419 RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString()); 420 ServerName sn = 421 ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(), 422 System.currentTimeMillis()); 423 try { 424 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, 425 User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 426 AuthenticationProtos.AuthenticationService.BlockingInterface stub = 427 AuthenticationProtos.AuthenticationService.newBlockingStub(channel); 428 AuthenticationProtos.WhoAmIResponse response = 429 stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance()); 430 String myname = response.getUsername(); 431 assertEquals("testuser", myname); 432 String authMethod = response.getAuthMethod(); 433 assertEquals("TOKEN", authMethod); 434 } finally { 435 rpcClient.close(); 436 } 437 return null; 438 } 439 }); 440 } 441 442 @Test testUseExistingToken()443 public void testUseExistingToken() throws Exception { 444 User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2", 445 new String[]{"testgroup"}); 446 Token<AuthenticationTokenIdentifier> token = 447 secretManager.generateToken(user.getName()); 448 assertNotNull(token); 449 user.addToken(token); 450 451 // make sure we got a token 452 Token<AuthenticationTokenIdentifier> firstToken = 453 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 454 assertNotNull(firstToken); 455 assertEquals(token, firstToken); 456 457 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 458 try { 459 assertFalse(TokenUtil.addTokenIfMissing(conn, user)); 460 // make sure we still have the same token 461 Token<AuthenticationTokenIdentifier> secondToken = 462 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); 463 assertEquals(firstToken, secondToken); 464 } finally { 465 conn.close(); 466 } 467 } 468 } 469