1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hbase.ipc;
19 
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23 import static org.mockito.Matchers.anyObject;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.internal.verification.VerificationModeFactory.times;
27 
28 import java.io.IOException;
29 import java.net.ConnectException;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.SocketTimeoutException;
33 import java.util.ArrayList;
34 import java.util.List;
35 
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellScanner;
41 import org.apache.hadoop.hbase.CellUtil;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.KeyValue;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.client.MetricsConnection;
47 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
48 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
49 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
50 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
51 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
52 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
53 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
54 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
55 import org.apache.hadoop.hbase.security.User;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.apache.hadoop.hbase.util.Pair;
58 import org.apache.hadoop.io.compress.GzipCodec;
59 import org.apache.hadoop.util.StringUtils;
60 import org.junit.Assert;
61 import org.junit.Test;
62 
63 import com.google.common.collect.ImmutableList;
64 import com.google.common.collect.Lists;
65 import com.google.protobuf.BlockingRpcChannel;
66 import com.google.protobuf.BlockingService;
67 import com.google.protobuf.Descriptors.MethodDescriptor;
68 import com.google.protobuf.Message;
69 import com.google.protobuf.RpcController;
70 import com.google.protobuf.ServiceException;
71 
72 /**
73  * Some basic ipc tests.
74  */
75 public abstract class AbstractTestIPC {
76 
77   private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
78 
79   private static byte[] CELL_BYTES = Bytes.toBytes("xyz");
80   private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
81   static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
82   static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
83   static final Configuration CONF = HBaseConfiguration.create();
84   // We are using the test TestRpcServiceProtos generated classes and Service because they are
85   // available and basic with methods like 'echo', and ping. Below we make a blocking service
86   // by passing in implementation of blocking interface. We use this service in all tests that
87   // follow.
88   static final BlockingService SERVICE =
89       TestRpcServiceProtos.TestProtobufRpcProto
90           .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
91 
92             @Override
93             public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
94                 throws ServiceException {
95               return null;
96             }
97 
98             @Override
99             public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
100                 throws ServiceException {
101               return null;
102             }
103 
104             @Override
105             public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
106                 throws ServiceException {
107               if (controller instanceof PayloadCarryingRpcController) {
108                 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
109                 // If cells, scan them to check we are able to iterate what we were given and since
110                 // this is
111                 // an echo, just put them back on the controller creating a new block. Tests our
112                 // block
113                 // building.
114                 CellScanner cellScanner = pcrc.cellScanner();
115                 List<Cell> list = null;
116                 if (cellScanner != null) {
117                   list = new ArrayList<Cell>();
118                   try {
119                     while (cellScanner.advance()) {
120                       list.add(cellScanner.current());
121                     }
122                   } catch (IOException e) {
123                     throw new ServiceException(e);
124                   }
125                 }
126                 cellScanner = CellUtil.createCellScanner(list);
127                 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
128               }
129               return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
130             }
131           });
132 
133   /**
134    * Instance of server. We actually don't do anything speical in here so could just use
135    * HBaseRpcServer directly.
136    */
137   static class TestRpcServer extends RpcServer {
138 
TestRpcServer()139     TestRpcServer() throws IOException {
140       this(new FifoRpcScheduler(CONF, 1));
141     }
142 
TestRpcServer(RpcScheduler scheduler)143     TestRpcServer(RpcScheduler scheduler) throws IOException {
144       super(null, "testRpcServer", Lists
145           .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
146           "localhost", 0), CONF, scheduler);
147     }
148 
149     @Override
call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)150     public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
151         Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
152         throws IOException {
153       return super.call(service, md, param, cellScanner, receiveTime, status);
154     }
155   }
156 
createRpcClientNoCodec(Configuration conf)157   protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
158 
159   /**
160    * Ensure we do not HAVE TO HAVE a codec.
161    * @throws InterruptedException
162    * @throws IOException
163    */
164   @Test
testNoCodec()165   public void testNoCodec() throws InterruptedException, IOException {
166     Configuration conf = HBaseConfiguration.create();
167     AbstractRpcClient client = createRpcClientNoCodec(conf);
168     TestRpcServer rpcServer = new TestRpcServer();
169     try {
170       rpcServer.start();
171       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
172       final String message = "hello";
173       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
174       InetSocketAddress address = rpcServer.getListenerAddress();
175       if (address == null) {
176         throw new IOException("Listener channel is closed");
177       }
178       Pair<Message, CellScanner> r =
179           client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
180               new MetricsConnection.CallStats());
181       assertTrue(r.getSecond() == null);
182       // Silly assertion that the message is in the returned pb.
183       assertTrue(r.getFirst().toString().contains(message));
184     } finally {
185       client.close();
186       rpcServer.stop();
187     }
188   }
189 
createRpcClient(Configuration conf)190   protected abstract AbstractRpcClient createRpcClient(Configuration conf);
191 
192   /**
193    * It is hard to verify the compression is actually happening under the wraps. Hope that if
194    * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
195    * confirm that compression is happening down in the client and server).
196    * @throws IOException
197    * @throws InterruptedException
198    * @throws SecurityException
199    * @throws NoSuchMethodException
200    */
201   @Test
testCompressCellBlock()202   public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException,
203       NoSuchMethodException, ServiceException {
204     Configuration conf = new Configuration(HBaseConfiguration.create());
205     conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
206     List<Cell> cells = new ArrayList<Cell>();
207     int count = 3;
208     for (int i = 0; i < count; i++) {
209       cells.add(CELL);
210     }
211     AbstractRpcClient client = createRpcClient(conf);
212     TestRpcServer rpcServer = new TestRpcServer();
213     try {
214       rpcServer.start();
215       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
216       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
217       PayloadCarryingRpcController pcrc =
218           new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
219       InetSocketAddress address = rpcServer.getListenerAddress();
220       if (address == null) {
221         throw new IOException("Listener channel is closed");
222       }
223       Pair<Message, CellScanner> r =
224           client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
225               new MetricsConnection.CallStats());
226       int index = 0;
227       while (r.getSecond().advance()) {
228         assertTrue(CELL.equals(r.getSecond().current()));
229         index++;
230       }
231       assertEquals(count, index);
232     } finally {
233       client.close();
234       rpcServer.stop();
235     }
236   }
237 
createRpcClientRTEDuringConnectionSetup(Configuration conf)238   protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
239       throws IOException;
240 
241   @Test
testRTEDuringConnectionSetup()242   public void testRTEDuringConnectionSetup() throws Exception {
243     Configuration conf = HBaseConfiguration.create();
244     TestRpcServer rpcServer = new TestRpcServer();
245     AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
246     try {
247       rpcServer.start();
248       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
249       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
250       InetSocketAddress address = rpcServer.getListenerAddress();
251       if (address == null) {
252         throw new IOException("Listener channel is closed");
253       }
254       client.call(null, md, param, null, User.getCurrent(), address,
255           new MetricsConnection.CallStats());
256       fail("Expected an exception to have been thrown!");
257     } catch (Exception e) {
258       LOG.info("Caught expected exception: " + e.toString());
259       assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
260     } finally {
261       client.close();
262       rpcServer.stop();
263     }
264   }
265 
266   /** Tests that the rpc scheduler is called when requests arrive. */
267   @Test
testRpcScheduler()268   public void testRpcScheduler() throws IOException, InterruptedException {
269     RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
270     RpcServer rpcServer = new TestRpcServer(scheduler);
271     verify(scheduler).init((RpcScheduler.Context) anyObject());
272     AbstractRpcClient client = createRpcClient(CONF);
273     try {
274       rpcServer.start();
275       verify(scheduler).start();
276       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
277       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
278       InetSocketAddress address = rpcServer.getListenerAddress();
279       if (address == null) {
280         throw new IOException("Listener channel is closed");
281       }
282       for (int i = 0; i < 10; i++) {
283         client.call(new PayloadCarryingRpcController(
284             CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
285             md.getOutputType().toProto(), User.getCurrent(), address,
286             new MetricsConnection.CallStats());
287       }
288       verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
289     } finally {
290       rpcServer.stop();
291       verify(scheduler).stop();
292     }
293   }
294 
295   /**
296    * Instance of RpcServer that echoes client hostAddress back to client
297    */
298   static class TestRpcServer1 extends RpcServer {
299 
300     private static BlockingInterface SERVICE1 =
301         new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
302           @Override
303           public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request)
304               throws ServiceException {
305             return EmptyResponseProto.newBuilder().build();
306           }
307 
308           @Override
309           public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
310               throws ServiceException {
311             final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress();
312             final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress();
313             return EchoResponseProto.newBuilder().setMessage(message).build();
314           }
315 
316           @Override
317           public EmptyResponseProto error(RpcController unused, EmptyRequestProto request)
318               throws ServiceException {
319             throw new ServiceException("error", new IOException("error"));
320           }
321         };
322 
TestRpcServer1()323     TestRpcServer1() throws IOException {
324       this(new FifoRpcScheduler(CONF, 1));
325     }
326 
TestRpcServer1(RpcScheduler scheduler)327     TestRpcServer1(RpcScheduler scheduler) throws IOException {
328       super(null, "testRemoteAddressInCallObject", Lists
329           .newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto
330               .newReflectiveBlockingService(SERVICE1), null)),
331           new InetSocketAddress("localhost", 0), CONF, scheduler);
332     }
333   }
334 
335   /**
336    * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
337    * remoteAddress set to its Call Object
338    * @throws ServiceException
339    */
340   @Test
testRpcServerForNotNullRemoteAddressInCallObject()341   public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException,
342       ServiceException {
343     final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
344     final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
345     final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
346     final AbstractRpcClient client =
347         new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null);
348     try {
349       rpcServer.start();
350       final InetSocketAddress isa = rpcServer.getListenerAddress();
351       if (isa == null) {
352         throw new IOException("Listener channel is closed");
353       }
354       final BlockingRpcChannel channel =
355           client.createBlockingRpcChannel(
356             ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()),
357             User.getCurrent(), 0);
358       TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
359           TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
360       final EchoRequestProto echoRequest =
361           EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
362       final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
363       Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage());
364     } finally {
365       client.close();
366       rpcServer.stop();
367     }
368   }
369 
370   @Test
testWrapException()371   public void testWrapException() throws Exception {
372     AbstractRpcClient client =
373         (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC");
374     final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
375     assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException);
376     assertTrue(client.wrapException(address,
377       new SocketTimeoutException()) instanceof SocketTimeoutException);
378     assertTrue(client.wrapException(address, new ConnectionClosingException(
379         "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
380     assertTrue(client
381         .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
382         .getCause() instanceof CallTimeoutException);
383   }
384 }
385