1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.security;
20 
21 import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
22 import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
23 import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotSame;
26 import static org.junit.Assert.assertSame;
27 
28 import java.io.File;
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Properties;
34 import java.util.concurrent.ThreadLocalRandom;
35 
36 import com.google.protobuf.RpcController;
37 import com.google.protobuf.ServiceException;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.CommonConfigurationKeys;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellScanner;
42 import org.apache.hadoop.hbase.CellUtil;
43 import org.apache.hadoop.hbase.HBaseTestingUtility;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
47 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
48 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
49 import org.apache.hadoop.hbase.ipc.RpcClient;
50 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
51 import org.apache.hadoop.hbase.ipc.RpcClientImpl;
52 import org.apache.hadoop.hbase.ipc.RpcServer;
53 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
54 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
55 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
56 import org.apache.hadoop.hbase.testclassification.SmallTests;
57 import org.apache.hadoop.minikdc.MiniKdc;
58 import org.apache.hadoop.security.UserGroupInformation;
59 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
60 import org.junit.AfterClass;
61 import org.junit.BeforeClass;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64 import org.mockito.Mockito;
65 
66 import com.google.common.collect.Lists;
67 import com.google.protobuf.BlockingRpcChannel;
68 import com.google.protobuf.BlockingService;
69 
70 @Category(SmallTests.class)
71 public class TestSecureRPC {
72 
73   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
74 
75   private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
76       .getPath());
77 
78   static final BlockingService SERVICE =
79       TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
80           new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
81 
82             @Override
83             public TestProtos.EmptyResponseProto ping(RpcController controller,
84                                                       TestProtos.EmptyRequestProto request)
85                 throws ServiceException {
86               return null;
87             }
88 
89             @Override
90             public TestProtos.EmptyResponseProto error(RpcController controller,
91                                                        TestProtos.EmptyRequestProto request)
92                 throws ServiceException {
93               return null;
94             }
95 
96             @Override
97             public TestProtos.EchoResponseProto echo(RpcController controller,
98                                                      TestProtos.EchoRequestProto request)
99                 throws ServiceException {
100               if (controller instanceof PayloadCarryingRpcController) {
101                 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
102                 // If cells, scan them to check we are able to iterate what we were given and since
103                 // this is
104                 // an echo, just put them back on the controller creating a new block. Tests our
105                 // block
106                 // building.
107                 CellScanner cellScanner = pcrc.cellScanner();
108                 List<Cell> list = null;
109                 if (cellScanner != null) {
110                   list = new ArrayList<Cell>();
111                   try {
112                     while (cellScanner.advance()) {
113                       list.add(cellScanner.current());
114                     }
115                   } catch (IOException e) {
116                     throw new ServiceException(e);
117                   }
118                 }
119                 cellScanner = CellUtil.createCellScanner(list);
120                 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
121               }
122               return TestProtos.EchoResponseProto.newBuilder()
123                   .setMessage(request.getMessage()).build();
124             }
125           });
126 
127   private static MiniKdc KDC;
128 
129   private static String HOST = "localhost";
130 
131   private static String PRINCIPAL;
132 
133   @BeforeClass
setUp()134   public static void setUp() throws Exception {
135     Properties conf = MiniKdc.createConf();
136     conf.put(MiniKdc.DEBUG, true);
137     KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
138     KDC.start();
139     PRINCIPAL = "hbase/" + HOST;
140     KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
141     HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
142     HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
143   }
144 
145   @AfterClass
tearDown()146   public static void tearDown() throws IOException {
147     if (KDC != null) {
148       KDC.stop();
149     }
150     TEST_UTIL.cleanupTestDir();
151   }
152 
153   @Test
testRpc()154   public void testRpc() throws Exception {
155     testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
156   }
157 
158   @Test
testRpcWithInsecureFallback()159   public void testRpcWithInsecureFallback() throws Exception {
160     testRpcFallbackToSimpleAuth(RpcClientImpl.class);
161   }
162 
163   @Test
testAsyncRpc()164   public void testAsyncRpc() throws Exception {
165     testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
166   }
167 
168   @Test
testAsyncRpcWithInsecureFallback()169   public void testAsyncRpcWithInsecureFallback() throws Exception {
170     testRpcFallbackToSimpleAuth(AsyncRpcClient.class);
171   }
172 
testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)173   private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
174       throws Exception {
175     String krbKeytab = getKeytabFileForTesting();
176     String krbPrincipal = getPrincipalForTesting();
177 
178     UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
179     UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
180 
181     // check that the login user is okay:
182     assertSame(ugi, ugi2);
183     assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
184     assertEquals(krbPrincipal, ugi.getUserName());
185 
186     Configuration clientConf = getSecuredConfiguration();
187     callRpcService(rpcImplClass, User.create(ugi2), clientConf, false);
188   }
189 
loginKerberosPrincipal(String krbKeytab, String krbPrincipal)190   private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
191       throws Exception {
192     Configuration cnf = new Configuration();
193     cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
194     UserGroupInformation.setConfiguration(cnf);
195     UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
196     return UserGroupInformation.getLoginUser();
197   }
198 
callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser, Configuration clientConf, boolean allowInsecureFallback)199   private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser,
200                               Configuration clientConf, boolean allowInsecureFallback)
201       throws Exception {
202     Configuration clientConfCopy = new Configuration(clientConf);
203     clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
204 
205     Configuration conf = getSecuredConfiguration();
206     conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback);
207 
208     SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
209     Mockito.when(securityInfoMock.getServerPrincipal())
210         .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
211     SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
212 
213     InetSocketAddress isa = new InetSocketAddress(HOST, 0);
214 
215     RpcServerInterface rpcServer =
216         new RpcServer(null, "AbstractTestSecureIPC",
217             Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa,
218             conf, new FifoRpcScheduler(conf, 1));
219     rpcServer.start();
220     try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
221         HConstants.DEFAULT_CLUSTER_ID.toString())) {
222       InetSocketAddress address = rpcServer.getListenerAddress();
223       if (address == null) {
224         throw new IOException("Listener channel is closed");
225       }
226       BlockingRpcChannel channel =
227           rpcClient.createBlockingRpcChannel(
228 
229             ServerName.valueOf(address.getHostName(), address.getPort(),
230             System.currentTimeMillis()), clientUser, 5000);
231       TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
232           TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
233       List<String> results = new ArrayList<String>();
234       TestThread th1 = new TestThread(stub, results);
235       th1.start();
236       th1.join();
237 
238     } finally {
239       rpcServer.stop();
240     }
241   }
242 
testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass)243   public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception {
244     String krbKeytab = getKeytabFileForTesting();
245     String krbPrincipal = getPrincipalForTesting();
246 
247     UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
248     assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
249     assertEquals(krbPrincipal, ugi.getUserName());
250 
251     String clientUsername = "testuser";
252     UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
253         new String[]{clientUsername});
254 
255     // check that the client user is insecure
256     assertNotSame(ugi, clientUgi);
257     assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
258     assertEquals(clientUsername, clientUgi.getUserName());
259 
260     Configuration clientConf = new Configuration();
261     clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
262     callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
263   }
264 
265   public static class TestThread extends Thread {
266       private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
267 
268       private final List<String> results;
269 
TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results)270           public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) {
271           this.stub = stub;
272           this.results = results;
273         }
274 
275           @Override
run()276       public void run() {
277           String result;
278           try {
279               result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(String.valueOf(
280                   ThreadLocalRandom.current().nextInt())).build()).getMessage();
281             } catch (ServiceException e) {
282               throw new RuntimeException(e);
283             }
284           if (results != null) {
285               synchronized (results) {
286                   results.add(result);
287                 }
288             }
289         }
290     }
291 }
292