1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.security; 20 21 import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting; 22 import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; 23 import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; 24 import static org.junit.Assert.assertEquals; 25 import static org.junit.Assert.assertNotSame; 26 import static org.junit.Assert.assertSame; 27 28 import java.io.File; 29 import java.io.IOException; 30 import java.net.InetSocketAddress; 31 import java.util.ArrayList; 32 import java.util.List; 33 import java.util.Properties; 34 import java.util.concurrent.ThreadLocalRandom; 35 36 import com.google.protobuf.RpcController; 37 import com.google.protobuf.ServiceException; 38 import org.apache.hadoop.conf.Configuration; 39 import org.apache.hadoop.fs.CommonConfigurationKeys; 40 import org.apache.hadoop.hbase.Cell; 41 import org.apache.hadoop.hbase.CellScanner; 42 import org.apache.hadoop.hbase.CellUtil; 43 import org.apache.hadoop.hbase.HBaseTestingUtility; 44 import org.apache.hadoop.hbase.HConstants; 45 import org.apache.hadoop.hbase.ServerName; 46 import org.apache.hadoop.hbase.ipc.AsyncRpcClient; 47 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; 48 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; 49 import org.apache.hadoop.hbase.ipc.RpcClient; 50 import org.apache.hadoop.hbase.ipc.RpcClientFactory; 51 import org.apache.hadoop.hbase.ipc.RpcClientImpl; 52 import org.apache.hadoop.hbase.ipc.RpcServer; 53 import org.apache.hadoop.hbase.ipc.RpcServerInterface; 54 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; 55 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; 56 import org.apache.hadoop.hbase.testclassification.SmallTests; 57 import org.apache.hadoop.minikdc.MiniKdc; 58 import org.apache.hadoop.security.UserGroupInformation; 59 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 60 import org.junit.AfterClass; 61 import org.junit.BeforeClass; 62 import org.junit.Test; 63 import org.junit.experimental.categories.Category; 64 import org.mockito.Mockito; 65 66 import com.google.common.collect.Lists; 67 import com.google.protobuf.BlockingRpcChannel; 68 import com.google.protobuf.BlockingService; 69 70 @Category(SmallTests.class) 71 public class TestSecureRPC { 72 73 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 74 75 private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri() 76 .getPath()); 77 78 static final BlockingService SERVICE = 79 TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService( 80 new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { 81 82 @Override 83 public TestProtos.EmptyResponseProto ping(RpcController controller, 84 TestProtos.EmptyRequestProto request) 85 throws ServiceException { 86 return null; 87 } 88 89 @Override 90 public TestProtos.EmptyResponseProto error(RpcController controller, 91 TestProtos.EmptyRequestProto request) 92 throws ServiceException { 93 return null; 94 } 95 96 @Override 97 public TestProtos.EchoResponseProto echo(RpcController controller, 98 TestProtos.EchoRequestProto request) 99 throws ServiceException { 100 if (controller instanceof PayloadCarryingRpcController) { 101 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; 102 // If cells, scan them to check we are able to iterate what we were given and since 103 // this is 104 // an echo, just put them back on the controller creating a new block. Tests our 105 // block 106 // building. 107 CellScanner cellScanner = pcrc.cellScanner(); 108 List<Cell> list = null; 109 if (cellScanner != null) { 110 list = new ArrayList<Cell>(); 111 try { 112 while (cellScanner.advance()) { 113 list.add(cellScanner.current()); 114 } 115 } catch (IOException e) { 116 throw new ServiceException(e); 117 } 118 } 119 cellScanner = CellUtil.createCellScanner(list); 120 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); 121 } 122 return TestProtos.EchoResponseProto.newBuilder() 123 .setMessage(request.getMessage()).build(); 124 } 125 }); 126 127 private static MiniKdc KDC; 128 129 private static String HOST = "localhost"; 130 131 private static String PRINCIPAL; 132 133 @BeforeClass setUp()134 public static void setUp() throws Exception { 135 Properties conf = MiniKdc.createConf(); 136 conf.put(MiniKdc.DEBUG, true); 137 KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath())); 138 KDC.start(); 139 PRINCIPAL = "hbase/" + HOST; 140 KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL); 141 HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); 142 HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); 143 } 144 145 @AfterClass tearDown()146 public static void tearDown() throws IOException { 147 if (KDC != null) { 148 KDC.stop(); 149 } 150 TEST_UTIL.cleanupTestDir(); 151 } 152 153 @Test testRpc()154 public void testRpc() throws Exception { 155 testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class); 156 } 157 158 @Test testRpcWithInsecureFallback()159 public void testRpcWithInsecureFallback() throws Exception { 160 testRpcFallbackToSimpleAuth(RpcClientImpl.class); 161 } 162 163 @Test testAsyncRpc()164 public void testAsyncRpc() throws Exception { 165 testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class); 166 } 167 168 @Test testAsyncRpcWithInsecureFallback()169 public void testAsyncRpcWithInsecureFallback() throws Exception { 170 testRpcFallbackToSimpleAuth(AsyncRpcClient.class); 171 } 172 testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)173 private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass) 174 throws Exception { 175 String krbKeytab = getKeytabFileForTesting(); 176 String krbPrincipal = getPrincipalForTesting(); 177 178 UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); 179 UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser(); 180 181 // check that the login user is okay: 182 assertSame(ugi, ugi2); 183 assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod()); 184 assertEquals(krbPrincipal, ugi.getUserName()); 185 186 Configuration clientConf = getSecuredConfiguration(); 187 callRpcService(rpcImplClass, User.create(ugi2), clientConf, false); 188 } 189 loginKerberosPrincipal(String krbKeytab, String krbPrincipal)190 private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal) 191 throws Exception { 192 Configuration cnf = new Configuration(); 193 cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); 194 UserGroupInformation.setConfiguration(cnf); 195 UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab); 196 return UserGroupInformation.getLoginUser(); 197 } 198 callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser, Configuration clientConf, boolean allowInsecureFallback)199 private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser, 200 Configuration clientConf, boolean allowInsecureFallback) 201 throws Exception { 202 Configuration clientConfCopy = new Configuration(clientConf); 203 clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName()); 204 205 Configuration conf = getSecuredConfiguration(); 206 conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback); 207 208 SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); 209 Mockito.when(securityInfoMock.getServerPrincipal()) 210 .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); 211 SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); 212 213 InetSocketAddress isa = new InetSocketAddress(HOST, 0); 214 215 RpcServerInterface rpcServer = 216 new RpcServer(null, "AbstractTestSecureIPC", 217 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa, 218 conf, new FifoRpcScheduler(conf, 1)); 219 rpcServer.start(); 220 try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf, 221 HConstants.DEFAULT_CLUSTER_ID.toString())) { 222 InetSocketAddress address = rpcServer.getListenerAddress(); 223 if (address == null) { 224 throw new IOException("Listener channel is closed"); 225 } 226 BlockingRpcChannel channel = 227 rpcClient.createBlockingRpcChannel( 228 229 ServerName.valueOf(address.getHostName(), address.getPort(), 230 System.currentTimeMillis()), clientUser, 5000); 231 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = 232 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); 233 List<String> results = new ArrayList<String>(); 234 TestThread th1 = new TestThread(stub, results); 235 th1.start(); 236 th1.join(); 237 238 } finally { 239 rpcServer.stop(); 240 } 241 } 242 testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass)243 public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception { 244 String krbKeytab = getKeytabFileForTesting(); 245 String krbPrincipal = getPrincipalForTesting(); 246 247 UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); 248 assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod()); 249 assertEquals(krbPrincipal, ugi.getUserName()); 250 251 String clientUsername = "testuser"; 252 UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername, 253 new String[]{clientUsername}); 254 255 // check that the client user is insecure 256 assertNotSame(ugi, clientUgi); 257 assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod()); 258 assertEquals(clientUsername, clientUgi.getUserName()); 259 260 Configuration clientConf = new Configuration(); 261 clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); 262 callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true); 263 } 264 265 public static class TestThread extends Thread { 266 private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub; 267 268 private final List<String> results; 269 TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results)270 public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) { 271 this.stub = stub; 272 this.results = results; 273 } 274 275 @Override run()276 public void run() { 277 String result; 278 try { 279 result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(String.valueOf( 280 ThreadLocalRandom.current().nextInt())).build()).getMessage(); 281 } catch (ServiceException e) { 282 throw new RuntimeException(e); 283 } 284 if (results != null) { 285 synchronized (results) { 286 results.add(result); 287 } 288 } 289 } 290 } 291 } 292