1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.ipc; 19 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertTrue; 22 import static org.junit.Assert.fail; 23 import static org.mockito.Matchers.anyObject; 24 import static org.mockito.Mockito.spy; 25 import static org.mockito.Mockito.verify; 26 import static org.mockito.internal.verification.VerificationModeFactory.times; 27 28 import java.io.IOException; 29 import java.net.ConnectException; 30 import java.net.InetAddress; 31 import java.net.InetSocketAddress; 32 import java.net.SocketTimeoutException; 33 import java.util.ArrayList; 34 import java.util.List; 35 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 import org.apache.hadoop.conf.Configuration; 39 import org.apache.hadoop.hbase.Cell; 40 import org.apache.hadoop.hbase.CellScanner; 41 import org.apache.hadoop.hbase.CellUtil; 42 import org.apache.hadoop.hbase.HBaseConfiguration; 43 import org.apache.hadoop.hbase.HConstants; 44 import org.apache.hadoop.hbase.KeyValue; 45 import org.apache.hadoop.hbase.ServerName; 46 import org.apache.hadoop.hbase.client.MetricsConnection; 47 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; 48 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; 49 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; 50 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; 51 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; 52 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; 53 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 54 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; 55 import org.apache.hadoop.hbase.security.User; 56 import org.apache.hadoop.hbase.util.Bytes; 57 import org.apache.hadoop.hbase.util.Pair; 58 import org.apache.hadoop.io.compress.GzipCodec; 59 import org.apache.hadoop.util.StringUtils; 60 import org.junit.Assert; 61 import org.junit.Test; 62 63 import com.google.common.collect.ImmutableList; 64 import com.google.common.collect.Lists; 65 import com.google.protobuf.BlockingRpcChannel; 66 import com.google.protobuf.BlockingService; 67 import com.google.protobuf.Descriptors.MethodDescriptor; 68 import com.google.protobuf.Message; 69 import com.google.protobuf.RpcController; 70 import com.google.protobuf.ServiceException; 71 72 /** 73 * Some basic ipc tests. 74 */ 75 public abstract class AbstractTestIPC { 76 77 private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class); 78 79 private static byte[] CELL_BYTES = Bytes.toBytes("xyz"); 80 private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); 81 static byte[] BIG_CELL_BYTES = new byte[10 * 1024]; 82 static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); 83 static final Configuration CONF = HBaseConfiguration.create(); 84 // We are using the test TestRpcServiceProtos generated classes and Service because they are 85 // available and basic with methods like 'echo', and ping. Below we make a blocking service 86 // by passing in implementation of blocking interface. We use this service in all tests that 87 // follow. 88 static final BlockingService SERVICE = 89 TestRpcServiceProtos.TestProtobufRpcProto 90 .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { 91 92 @Override 93 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) 94 throws ServiceException { 95 return null; 96 } 97 98 @Override 99 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) 100 throws ServiceException { 101 return null; 102 } 103 104 @Override 105 public EchoResponseProto echo(RpcController controller, EchoRequestProto request) 106 throws ServiceException { 107 if (controller instanceof PayloadCarryingRpcController) { 108 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; 109 // If cells, scan them to check we are able to iterate what we were given and since 110 // this is 111 // an echo, just put them back on the controller creating a new block. Tests our 112 // block 113 // building. 114 CellScanner cellScanner = pcrc.cellScanner(); 115 List<Cell> list = null; 116 if (cellScanner != null) { 117 list = new ArrayList<Cell>(); 118 try { 119 while (cellScanner.advance()) { 120 list.add(cellScanner.current()); 121 } 122 } catch (IOException e) { 123 throw new ServiceException(e); 124 } 125 } 126 cellScanner = CellUtil.createCellScanner(list); 127 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); 128 } 129 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); 130 } 131 }); 132 133 /** 134 * Instance of server. We actually don't do anything speical in here so could just use 135 * HBaseRpcServer directly. 136 */ 137 static class TestRpcServer extends RpcServer { 138 TestRpcServer()139 TestRpcServer() throws IOException { 140 this(new FifoRpcScheduler(CONF, 1)); 141 } 142 TestRpcServer(RpcScheduler scheduler)143 TestRpcServer(RpcScheduler scheduler) throws IOException { 144 super(null, "testRpcServer", Lists 145 .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress( 146 "localhost", 0), CONF, scheduler); 147 } 148 149 @Override call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)150 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, 151 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) 152 throws IOException { 153 return super.call(service, md, param, cellScanner, receiveTime, status); 154 } 155 } 156 createRpcClientNoCodec(Configuration conf)157 protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf); 158 159 /** 160 * Ensure we do not HAVE TO HAVE a codec. 161 * @throws InterruptedException 162 * @throws IOException 163 */ 164 @Test testNoCodec()165 public void testNoCodec() throws InterruptedException, IOException { 166 Configuration conf = HBaseConfiguration.create(); 167 AbstractRpcClient client = createRpcClientNoCodec(conf); 168 TestRpcServer rpcServer = new TestRpcServer(); 169 try { 170 rpcServer.start(); 171 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); 172 final String message = "hello"; 173 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); 174 InetSocketAddress address = rpcServer.getListenerAddress(); 175 if (address == null) { 176 throw new IOException("Listener channel is closed"); 177 } 178 Pair<Message, CellScanner> r = 179 client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address, 180 new MetricsConnection.CallStats()); 181 assertTrue(r.getSecond() == null); 182 // Silly assertion that the message is in the returned pb. 183 assertTrue(r.getFirst().toString().contains(message)); 184 } finally { 185 client.close(); 186 rpcServer.stop(); 187 } 188 } 189 createRpcClient(Configuration conf)190 protected abstract AbstractRpcClient createRpcClient(Configuration conf); 191 192 /** 193 * It is hard to verify the compression is actually happening under the wraps. Hope that if 194 * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to 195 * confirm that compression is happening down in the client and server). 196 * @throws IOException 197 * @throws InterruptedException 198 * @throws SecurityException 199 * @throws NoSuchMethodException 200 */ 201 @Test testCompressCellBlock()202 public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException, 203 NoSuchMethodException, ServiceException { 204 Configuration conf = new Configuration(HBaseConfiguration.create()); 205 conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); 206 List<Cell> cells = new ArrayList<Cell>(); 207 int count = 3; 208 for (int i = 0; i < count; i++) { 209 cells.add(CELL); 210 } 211 AbstractRpcClient client = createRpcClient(conf); 212 TestRpcServer rpcServer = new TestRpcServer(); 213 try { 214 rpcServer.start(); 215 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); 216 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 217 PayloadCarryingRpcController pcrc = 218 new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); 219 InetSocketAddress address = rpcServer.getListenerAddress(); 220 if (address == null) { 221 throw new IOException("Listener channel is closed"); 222 } 223 Pair<Message, CellScanner> r = 224 client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address, 225 new MetricsConnection.CallStats()); 226 int index = 0; 227 while (r.getSecond().advance()) { 228 assertTrue(CELL.equals(r.getSecond().current())); 229 index++; 230 } 231 assertEquals(count, index); 232 } finally { 233 client.close(); 234 rpcServer.stop(); 235 } 236 } 237 createRpcClientRTEDuringConnectionSetup(Configuration conf)238 protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) 239 throws IOException; 240 241 @Test testRTEDuringConnectionSetup()242 public void testRTEDuringConnectionSetup() throws Exception { 243 Configuration conf = HBaseConfiguration.create(); 244 TestRpcServer rpcServer = new TestRpcServer(); 245 AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf); 246 try { 247 rpcServer.start(); 248 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); 249 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 250 InetSocketAddress address = rpcServer.getListenerAddress(); 251 if (address == null) { 252 throw new IOException("Listener channel is closed"); 253 } 254 client.call(null, md, param, null, User.getCurrent(), address, 255 new MetricsConnection.CallStats()); 256 fail("Expected an exception to have been thrown!"); 257 } catch (Exception e) { 258 LOG.info("Caught expected exception: " + e.toString()); 259 assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); 260 } finally { 261 client.close(); 262 rpcServer.stop(); 263 } 264 } 265 266 /** Tests that the rpc scheduler is called when requests arrive. */ 267 @Test testRpcScheduler()268 public void testRpcScheduler() throws IOException, InterruptedException { 269 RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); 270 RpcServer rpcServer = new TestRpcServer(scheduler); 271 verify(scheduler).init((RpcScheduler.Context) anyObject()); 272 AbstractRpcClient client = createRpcClient(CONF); 273 try { 274 rpcServer.start(); 275 verify(scheduler).start(); 276 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); 277 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 278 InetSocketAddress address = rpcServer.getListenerAddress(); 279 if (address == null) { 280 throw new IOException("Listener channel is closed"); 281 } 282 for (int i = 0; i < 10; i++) { 283 client.call(new PayloadCarryingRpcController( 284 CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, 285 md.getOutputType().toProto(), User.getCurrent(), address, 286 new MetricsConnection.CallStats()); 287 } 288 verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); 289 } finally { 290 rpcServer.stop(); 291 verify(scheduler).stop(); 292 } 293 } 294 295 /** 296 * Instance of RpcServer that echoes client hostAddress back to client 297 */ 298 static class TestRpcServer1 extends RpcServer { 299 300 private static BlockingInterface SERVICE1 = 301 new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { 302 @Override 303 public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request) 304 throws ServiceException { 305 return EmptyResponseProto.newBuilder().build(); 306 } 307 308 @Override 309 public EchoResponseProto echo(RpcController unused, EchoRequestProto request) 310 throws ServiceException { 311 final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress(); 312 final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress(); 313 return EchoResponseProto.newBuilder().setMessage(message).build(); 314 } 315 316 @Override 317 public EmptyResponseProto error(RpcController unused, EmptyRequestProto request) 318 throws ServiceException { 319 throw new ServiceException("error", new IOException("error")); 320 } 321 }; 322 TestRpcServer1()323 TestRpcServer1() throws IOException { 324 this(new FifoRpcScheduler(CONF, 1)); 325 } 326 TestRpcServer1(RpcScheduler scheduler)327 TestRpcServer1(RpcScheduler scheduler) throws IOException { 328 super(null, "testRemoteAddressInCallObject", Lists 329 .newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto 330 .newReflectiveBlockingService(SERVICE1), null)), 331 new InetSocketAddress("localhost", 0), CONF, scheduler); 332 } 333 } 334 335 /** 336 * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null 337 * remoteAddress set to its Call Object 338 * @throws ServiceException 339 */ 340 @Test testRpcServerForNotNullRemoteAddressInCallObject()341 public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException, 342 ServiceException { 343 final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1); 344 final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler); 345 final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); 346 final AbstractRpcClient client = 347 new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null); 348 try { 349 rpcServer.start(); 350 final InetSocketAddress isa = rpcServer.getListenerAddress(); 351 if (isa == null) { 352 throw new IOException("Listener channel is closed"); 353 } 354 final BlockingRpcChannel channel = 355 client.createBlockingRpcChannel( 356 ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()), 357 User.getCurrent(), 0); 358 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = 359 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); 360 final EchoRequestProto echoRequest = 361 EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build(); 362 final EchoResponseProto echoResponse = stub.echo(null, echoRequest); 363 Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage()); 364 } finally { 365 client.close(); 366 rpcServer.stop(); 367 } 368 } 369 370 @Test testWrapException()371 public void testWrapException() throws Exception { 372 AbstractRpcClient client = 373 (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC"); 374 final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0); 375 assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException); 376 assertTrue(client.wrapException(address, 377 new SocketTimeoutException()) instanceof SocketTimeoutException); 378 assertTrue(client.wrapException(address, new ConnectionClosingException( 379 "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException); 380 assertTrue(client 381 .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException")) 382 .getCause() instanceof CallTimeoutException); 383 } 384 } 385