1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.ipc;
19 
20 import java.io.IOException;
21 import java.net.InetSocketAddress;
22 
23 import org.junit.Assert;
24 
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
27 import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
28 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
29 import org.apache.hadoop.net.NetUtils;
30 import org.junit.Before;
31 import org.junit.After;
32 import org.junit.Test;
33 import com.google.protobuf.BlockingService;
34 
35 public class TestMultipleProtocolServer {
36   private static final String ADDRESS = "0.0.0.0";
37   private static InetSocketAddress addr;
38   private static RPC.Server server;
39 
40   private static Configuration conf = new Configuration();
41 
42 
43   @ProtocolInfo(protocolName="Foo")
44   interface Foo0 extends VersionedProtocol {
45     public static final long versionID = 0L;
ping()46     String ping() throws IOException;
47 
48   }
49 
50   @ProtocolInfo(protocolName="Foo")
51   interface Foo1 extends VersionedProtocol {
52     public static final long versionID = 1L;
ping()53     String ping() throws IOException;
ping2()54     String ping2() throws IOException;
55   }
56 
57   @ProtocolInfo(protocolName="Foo")
58   interface FooUnimplemented extends VersionedProtocol {
59     public static final long versionID = 2L;
ping()60     String ping() throws IOException;
61   }
62 
63   interface Mixin extends VersionedProtocol{
64     public static final long versionID = 0L;
hello()65     void hello() throws IOException;
66   }
67   interface Bar extends Mixin {
68     public static final long versionID = 0L;
echo(int i)69     int echo(int i) throws IOException;
70   }
71 
72 
73 
74   class Foo0Impl implements Foo0 {
75 
76     @Override
getProtocolVersion(String protocol, long clientVersion)77     public long getProtocolVersion(String protocol, long clientVersion)
78         throws IOException {
79       return Foo0.versionID;
80     }
81 
82     @SuppressWarnings("unchecked")
83     @Override
getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)84     public ProtocolSignature getProtocolSignature(String protocol,
85         long clientVersion, int clientMethodsHash) throws IOException {
86       Class<? extends VersionedProtocol> inter;
87       try {
88         inter = (Class<? extends VersionedProtocol>)getClass().
89                                           getGenericInterfaces()[0];
90       } catch (Exception e) {
91         throw new IOException(e);
92       }
93       return ProtocolSignature.getProtocolSignature(clientMethodsHash,
94           getProtocolVersion(protocol, clientVersion), inter);
95     }
96 
97     @Override
ping()98     public String ping() {
99       return "Foo0";
100     }
101 
102   }
103 
104   class Foo1Impl implements Foo1 {
105 
106     @Override
getProtocolVersion(String protocol, long clientVersion)107     public long getProtocolVersion(String protocol, long clientVersion)
108         throws IOException {
109       return Foo1.versionID;
110     }
111 
112     @SuppressWarnings("unchecked")
113     @Override
getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)114     public ProtocolSignature getProtocolSignature(String protocol,
115         long clientVersion, int clientMethodsHash) throws IOException {
116       Class<? extends VersionedProtocol> inter;
117       try {
118         inter = (Class<? extends VersionedProtocol>)getClass().
119                                         getGenericInterfaces()[0];
120       } catch (Exception e) {
121         throw new IOException(e);
122       }
123       return ProtocolSignature.getProtocolSignature(clientMethodsHash,
124           getProtocolVersion(protocol, clientVersion), inter);
125     }
126 
127     @Override
ping()128     public String ping() {
129       return "Foo1";
130     }
131 
132     @Override
ping2()133     public String ping2() {
134       return "Foo1";
135 
136     }
137 
138   }
139 
140 
141   class BarImpl implements Bar {
142 
143     @Override
getProtocolVersion(String protocol, long clientVersion)144     public long getProtocolVersion(String protocol, long clientVersion)
145         throws IOException {
146       return Bar.versionID;
147     }
148 
149     @SuppressWarnings("unchecked")
150     @Override
getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)151     public ProtocolSignature getProtocolSignature(String protocol,
152         long clientVersion, int clientMethodsHash) throws IOException {
153       Class<? extends VersionedProtocol> inter;
154       try {
155         inter = (Class<? extends VersionedProtocol>)getClass().
156                                           getGenericInterfaces()[0];
157       } catch (Exception e) {
158         throw new IOException(e);
159       }
160       return ProtocolSignature.getProtocolSignature(clientMethodsHash,
161           getProtocolVersion(protocol, clientVersion), inter);
162     }
163 
164     @Override
echo(int i)165     public int echo(int i) {
166       return i;
167     }
168 
169     @Override
hello()170     public void hello() {
171 
172 
173     }
174   }
175   @Before
setUp()176   public void setUp() throws Exception {
177     // create a server with two handlers
178     server = new RPC.Builder(conf).setProtocol(Foo0.class)
179         .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0)
180         .setNumHandlers(2).setVerbose(false).build();
181     server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
182     server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
183     server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
184 
185 
186     // Add Protobuf server
187     // Create server side implementation
188     PBServerImpl pbServerImpl =
189         new PBServerImpl();
190     BlockingService service = TestProtobufRpcProto
191         .newReflectiveBlockingService(pbServerImpl);
192     server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
193         service);
194     server.start();
195     addr = NetUtils.getConnectAddress(server);
196   }
197 
198   @After
tearDown()199   public void tearDown() throws Exception {
200     server.stop();
201   }
202 
203   @Test
test1()204   public void test1() throws IOException {
205     ProtocolProxy<?> proxy;
206     proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
207 
208     Foo0 foo0 = (Foo0)proxy.getProxy();
209     Assert.assertEquals("Foo0", foo0.ping());
210 
211 
212     proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
213 
214 
215     Foo1 foo1 = (Foo1)proxy.getProxy();
216     Assert.assertEquals("Foo1", foo1.ping());
217     Assert.assertEquals("Foo1", foo1.ping());
218 
219 
220     proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
221 
222 
223     Bar bar = (Bar)proxy.getProxy();
224     Assert.assertEquals(99, bar.echo(99));
225 
226     // Now test Mixin class method
227 
228     Mixin mixin = bar;
229     mixin.hello();
230   }
231 
232 
233   // Server does not implement the FooUnimplemented version of protocol Foo.
234   // See that calls to it fail.
235   @Test(expected=IOException.class)
testNonExistingProtocol()236   public void testNonExistingProtocol() throws IOException {
237     ProtocolProxy<?> proxy;
238     proxy = RPC.getProtocolProxy(FooUnimplemented.class,
239         FooUnimplemented.versionID, addr, conf);
240 
241     FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
242     foo.ping();
243   }
244 
245 
246   /**
247    * getProtocolVersion of an unimplemented version should return highest version
248    * Similarly getProtocolSignature should work.
249    * @throws IOException
250    */
251   @Test
testNonExistingProtocol2()252   public void testNonExistingProtocol2() throws IOException {
253     ProtocolProxy<?> proxy;
254     proxy = RPC.getProtocolProxy(FooUnimplemented.class,
255         FooUnimplemented.versionID, addr, conf);
256 
257     FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
258     Assert.assertEquals(Foo1.versionID,
259         foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class),
260         FooUnimplemented.versionID));
261     foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class),
262         FooUnimplemented.versionID, 0);
263   }
264 
265   @Test(expected=IOException.class)
testIncorrectServerCreation()266   public void testIncorrectServerCreation() throws IOException {
267     new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl())
268         .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false)
269         .build();
270   }
271 
272   // Now test a PB service - a server  hosts both PB and Writable Rpcs.
273   @Test
testPBService()274   public void testPBService() throws Exception {
275     // Set RPC engine to protobuf RPC engine
276     Configuration conf2 = new Configuration();
277     RPC.setProtocolEngine(conf2, TestRpcService.class,
278         ProtobufRpcEngine.class);
279     TestRpcService client = RPC.getProxy(TestRpcService.class, 0, addr, conf2);
280     TestProtoBufRpc.testProtoBufRpc(client);
281   }
282 }
283