1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hbase.security.token;
20 
21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26 
27 import java.io.IOException;
28 import java.net.InetSocketAddress;
29 import java.security.PrivilegedExceptionAction;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.concurrent.ConcurrentMap;
33 import java.util.concurrent.ExecutorService;
34 
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.ChoreService;
39 import org.apache.hadoop.hbase.ClusterId;
40 import org.apache.hadoop.hbase.CoordinatedStateManager;
41 import org.apache.hadoop.hbase.Coprocessor;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.Server;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.ClusterConnection;
49 import org.apache.hadoop.hbase.client.Connection;
50 import org.apache.hadoop.hbase.client.ConnectionFactory;
51 import org.apache.hadoop.hbase.client.HTableInterface;
52 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
53 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
54 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
55 import org.apache.hadoop.hbase.ipc.RpcClient;
56 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
57 import org.apache.hadoop.hbase.ipc.RpcServer;
58 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
59 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
60 import org.apache.hadoop.hbase.ipc.ServerRpcController;
61 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
62 import org.apache.hadoop.hbase.regionserver.HRegion;
63 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
64 import org.apache.hadoop.hbase.security.SecurityInfo;
65 import org.apache.hadoop.hbase.security.User;
66 import org.apache.hadoop.hbase.testclassification.MediumTests;
67 import org.apache.hadoop.hbase.util.Bytes;
68 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
69 import org.apache.hadoop.hbase.util.Sleeper;
70 import org.apache.hadoop.hbase.util.Strings;
71 import org.apache.hadoop.hbase.util.Threads;
72 import org.apache.hadoop.hbase.util.Writables;
73 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
74 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
75 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
76 import org.apache.hadoop.net.DNS;
77 import org.apache.hadoop.security.UserGroupInformation;
78 import org.apache.hadoop.security.authorize.PolicyProvider;
79 import org.apache.hadoop.security.authorize.Service;
80 import org.apache.hadoop.security.token.SecretManager;
81 import org.apache.hadoop.security.token.Token;
82 import org.apache.hadoop.security.token.TokenIdentifier;
83 import org.junit.AfterClass;
84 import org.junit.BeforeClass;
85 import org.junit.Test;
86 import org.junit.experimental.categories.Category;
87 
88 import com.google.protobuf.BlockingRpcChannel;
89 import com.google.protobuf.BlockingService;
90 import com.google.protobuf.RpcController;
91 import com.google.protobuf.ServiceException;
92 
93 /**
94  * Tests for authentication token creation and usage
95  */
96 @Category(MediumTests.class)
97 public class TestTokenAuthentication {
98   static {
99     // Setting whatever system properties after recommendation from
100     // http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html
101     System.setProperty("java.security.krb5.realm", "hbase");
102     System.setProperty("java.security.krb5.kdc", "blah");
103   }
104   private static final Log LOG = LogFactory.getLog(TestTokenAuthentication.class);
105 
106   public interface AuthenticationServiceSecurityInfo {}
107 
108   /**
109    * Basic server process for RPC authentication testing
110    */
111   private static class TokenServer extends TokenProvider
112   implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
113     private static final Log LOG = LogFactory.getLog(TokenServer.class);
114     private Configuration conf;
115     private RpcServerInterface rpcServer;
116     private InetSocketAddress isa;
117     private ZooKeeperWatcher zookeeper;
118     private Sleeper sleeper;
119     private boolean started = false;
120     private boolean aborted = false;
121     private boolean stopped = false;
122     private long startcode;
123 
TokenServer(Configuration conf)124     public TokenServer(Configuration conf) throws IOException {
125       this.conf = conf;
126       this.startcode = EnvironmentEdgeManager.currentTime();
127       // Server to handle client requests.
128       String hostname =
129         Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
130       int port = 0;
131       // Creation of an ISA will force a resolve.
132       InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
133       if (initialIsa.getAddress() == null) {
134         throw new IllegalArgumentException("Failed resolve of " + initialIsa);
135       }
136       final List<BlockingServiceAndInterface> sai =
137         new ArrayList<BlockingServiceAndInterface>(1);
138       BlockingService service =
139         AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
140       sai.add(new BlockingServiceAndInterface(service,
141         AuthenticationProtos.AuthenticationService.BlockingInterface.class));
142       this.rpcServer =
143         new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
144       InetSocketAddress address = rpcServer.getListenerAddress();
145       if (address == null) {
146         throw new IOException("Listener channel is closed");
147       }
148       this.isa = address;
149       this.sleeper = new Sleeper(1000, this);
150     }
151 
152     @Override
getConfiguration()153     public Configuration getConfiguration() {
154       return conf;
155     }
156 
157     @Override
getConnection()158     public ClusterConnection getConnection() {
159       return null;
160     }
161 
162     @Override
getMetaTableLocator()163     public MetaTableLocator getMetaTableLocator() {
164       return null;
165     }
166 
167     @Override
getZooKeeper()168     public ZooKeeperWatcher getZooKeeper() {
169       return zookeeper;
170     }
171 
172     @Override
getCoordinatedStateManager()173     public CoordinatedStateManager getCoordinatedStateManager() {
174       return null;
175     }
176 
177     @Override
isAborted()178     public boolean isAborted() {
179       return aborted;
180     }
181 
182     @Override
getServerName()183     public ServerName getServerName() {
184       return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
185     }
186 
187     @Override
abort(String reason, Throwable error)188     public void abort(String reason, Throwable error) {
189       LOG.fatal("Aborting on: "+reason, error);
190       this.aborted = true;
191       this.stopped = true;
192       sleeper.skipSleepCycle();
193     }
194 
initialize()195     private void initialize() throws IOException {
196       // ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
197       Configuration zkConf = new Configuration(conf);
198       zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
199       this.zookeeper = new ZooKeeperWatcher(zkConf, TokenServer.class.getSimpleName(),
200           this, true);
201       this.rpcServer.start();
202 
203       // mock RegionServerServices to provide to coprocessor environment
204       final RegionServerServices mockServices = TEST_UTIL.createMockRegionServerService(rpcServer);
205 
206       // mock up coprocessor environment
207       super.start(new RegionCoprocessorEnvironment() {
208         @Override
209         public HRegion getRegion() { return null; }
210 
211         @Override
212         public RegionServerServices getRegionServerServices() {
213           return mockServices;
214         }
215 
216         @Override
217         public ConcurrentMap<String, Object> getSharedData() { return null; }
218 
219         @Override
220         public int getVersion() { return 0; }
221 
222         @Override
223         public String getHBaseVersion() { return null; }
224 
225         @Override
226         public Coprocessor getInstance() { return null; }
227 
228         @Override
229         public int getPriority() { return 0; }
230 
231         @Override
232         public int getLoadSequence() { return 0; }
233 
234         @Override
235         public Configuration getConfiguration() { return conf; }
236 
237         @Override
238         public HTableInterface getTable(TableName tableName) throws IOException
239           { return null; }
240 
241         @Override
242         public HTableInterface getTable(TableName tableName, ExecutorService service)
243             throws IOException {
244           return null;
245         }
246 
247         @Override
248         public ClassLoader getClassLoader() {
249           return Thread.currentThread().getContextClassLoader();
250         }
251 
252         @Override
253         public HRegionInfo getRegionInfo() {
254           return null;
255         }
256       });
257 
258       started = true;
259     }
260 
run()261     public void run() {
262       try {
263         initialize();
264         while (!stopped) {
265           this.sleeper.sleep();
266         }
267       } catch (Exception e) {
268         abort(e.getMessage(), e);
269       }
270       this.rpcServer.stop();
271     }
272 
isStarted()273     public boolean isStarted() {
274       return started;
275     }
276 
277     @Override
stop(String reason)278     public void stop(String reason) {
279       LOG.info("Stopping due to: "+reason);
280       this.stopped = true;
281       sleeper.skipSleepCycle();
282     }
283 
284     @Override
isStopped()285     public boolean isStopped() {
286       return stopped;
287     }
288 
getAddress()289     public InetSocketAddress getAddress() {
290       return isa;
291     }
292 
getSecretManager()293     public SecretManager<? extends TokenIdentifier> getSecretManager() {
294       return ((RpcServer)rpcServer).getSecretManager();
295     }
296 
297     @Override
getAuthenticationToken( RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)298     public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
299         RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
300       throws ServiceException {
301       LOG.debug("Authentication token request from " + RpcServer.getRequestUserName());
302       // ignore passed in controller -- it's always null
303       ServerRpcController serverController = new ServerRpcController();
304       BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback =
305           new BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>();
306       getAuthenticationToken(serverController, request, callback);
307       try {
308         serverController.checkFailed();
309         return callback.get();
310       } catch (IOException ioe) {
311         throw new ServiceException(ioe);
312       }
313     }
314 
315     @Override
whoAmI( RpcController controller, AuthenticationProtos.WhoAmIRequest request)316     public AuthenticationProtos.WhoAmIResponse whoAmI(
317         RpcController controller, AuthenticationProtos.WhoAmIRequest request)
318       throws ServiceException {
319       LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName());
320       // ignore passed in controller -- it's always null
321       ServerRpcController serverController = new ServerRpcController();
322       BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
323           new BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse>();
324       whoAmI(serverController, request, callback);
325       try {
326         serverController.checkFailed();
327         return callback.get();
328       } catch (IOException ioe) {
329         throw new ServiceException(ioe);
330       }
331     }
332 
333     @Override
getChoreService()334     public ChoreService getChoreService() {
335       return null;
336     }
337   }
338 
339   private static HBaseTestingUtility TEST_UTIL;
340   private static TokenServer server;
341   private static Thread serverThread;
342   private static AuthenticationTokenSecretManager secretManager;
343   private static ClusterId clusterId = new ClusterId();
344 
345   @BeforeClass
setupBeforeClass()346   public static void setupBeforeClass() throws Exception {
347     TEST_UTIL = new HBaseTestingUtility();
348     TEST_UTIL.startMiniZKCluster();
349     // register token type for protocol
350     SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
351       new SecurityInfo("hbase.test.kerberos.principal",
352         AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
353     // security settings only added after startup so that ZK does not require SASL
354     Configuration conf = TEST_UTIL.getConfiguration();
355     conf.set("hadoop.security.authentication", "kerberos");
356     conf.set("hbase.security.authentication", "kerberos");
357     conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
358     server = new TokenServer(conf);
359     serverThread = new Thread(server);
360     Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString());
361     // wait for startup
362     while (!server.isStarted() && !server.isStopped()) {
363       Thread.sleep(10);
364     }
365     server.rpcServer.refreshAuthManager(new PolicyProvider() {
366       @Override
367       public Service[] getServices() {
368         return new Service [] {
369           new Service("security.client.protocol.acl",
370             AuthenticationProtos.AuthenticationService.BlockingInterface.class)};
371       }
372     });
373     ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
374     secretManager = (AuthenticationTokenSecretManager)server.getSecretManager();
375     while(secretManager.getCurrentKey() == null) {
376       Thread.sleep(1);
377     }
378   }
379 
380   @AfterClass
tearDownAfterClass()381   public static void tearDownAfterClass() throws Exception {
382     server.stop("Test complete");
383     Threads.shutdown(serverThread);
384     TEST_UTIL.shutdownMiniZKCluster();
385   }
386 
387   @Test
testTokenCreation()388   public void testTokenCreation() throws Exception {
389     Token<AuthenticationTokenIdentifier> token =
390         secretManager.generateToken("testuser");
391 
392     AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
393     Writables.getWritable(token.getIdentifier(), ident);
394     assertEquals("Token username should match", "testuser",
395         ident.getUsername());
396     byte[] passwd = secretManager.retrievePassword(ident);
397     assertTrue("Token password and password from secret manager should match",
398         Bytes.equals(token.getPassword(), passwd));
399   }
400 
401   @Test
testTokenAuthentication()402   public void testTokenAuthentication() throws Exception {
403     UserGroupInformation testuser =
404         UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
405 
406     testuser.setAuthenticationMethod(
407         UserGroupInformation.AuthenticationMethod.TOKEN);
408     final Configuration conf = TEST_UTIL.getConfiguration();
409     UserGroupInformation.setConfiguration(conf);
410     Token<AuthenticationTokenIdentifier> token =
411         secretManager.generateToken("testuser");
412     LOG.debug("Got token: " + token.toString());
413     testuser.addToken(token);
414 
415     // verify the server authenticates us as this token user
416     testuser.doAs(new PrivilegedExceptionAction<Object>() {
417       public Object run() throws Exception {
418         Configuration c = server.getConfiguration();
419         RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
420         ServerName sn =
421             ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
422                 System.currentTimeMillis());
423         try {
424           BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
425               User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
426           AuthenticationProtos.AuthenticationService.BlockingInterface stub =
427               AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
428           AuthenticationProtos.WhoAmIResponse response =
429               stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
430           String myname = response.getUsername();
431           assertEquals("testuser", myname);
432           String authMethod = response.getAuthMethod();
433           assertEquals("TOKEN", authMethod);
434         } finally {
435           rpcClient.close();
436         }
437         return null;
438       }
439     });
440   }
441 
442   @Test
testUseExistingToken()443   public void testUseExistingToken() throws Exception {
444     User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
445         new String[]{"testgroup"});
446     Token<AuthenticationTokenIdentifier> token =
447         secretManager.generateToken(user.getName());
448     assertNotNull(token);
449     user.addToken(token);
450 
451     // make sure we got a token
452     Token<AuthenticationTokenIdentifier> firstToken =
453         new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
454     assertNotNull(firstToken);
455     assertEquals(token, firstToken);
456 
457     Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
458     try {
459       assertFalse(TokenUtil.addTokenIfMissing(conn, user));
460       // make sure we still have the same token
461       Token<AuthenticationTokenIdentifier> secondToken =
462           new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
463       assertEquals(firstToken, secondToken);
464     } finally {
465       conn.close();
466     }
467   }
468 }
469