1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.ipc; 19 20 import java.io.IOException; 21 import java.net.InetSocketAddress; 22 23 import org.junit.Assert; 24 25 import org.apache.hadoop.conf.Configuration; 26 import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl; 27 import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService; 28 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; 29 import org.apache.hadoop.net.NetUtils; 30 import org.junit.Before; 31 import org.junit.After; 32 import org.junit.Test; 33 import com.google.protobuf.BlockingService; 34 35 public class TestMultipleProtocolServer { 36 private static final String ADDRESS = "0.0.0.0"; 37 private static InetSocketAddress addr; 38 private static RPC.Server server; 39 40 private static Configuration conf = new Configuration(); 41 42 43 @ProtocolInfo(protocolName="Foo") 44 interface Foo0 extends VersionedProtocol { 45 public static final long versionID = 0L; ping()46 String ping() throws IOException; 47 48 } 49 50 @ProtocolInfo(protocolName="Foo") 51 interface Foo1 extends VersionedProtocol { 52 public static final long versionID = 1L; ping()53 String ping() throws IOException; ping2()54 String ping2() throws IOException; 55 } 56 57 @ProtocolInfo(protocolName="Foo") 58 interface FooUnimplemented extends VersionedProtocol { 59 public static final long versionID = 2L; ping()60 String ping() throws IOException; 61 } 62 63 interface Mixin extends VersionedProtocol{ 64 public static final long versionID = 0L; hello()65 void hello() throws IOException; 66 } 67 interface Bar extends Mixin { 68 public static final long versionID = 0L; echo(int i)69 int echo(int i) throws IOException; 70 } 71 72 73 74 class Foo0Impl implements Foo0 { 75 76 @Override getProtocolVersion(String protocol, long clientVersion)77 public long getProtocolVersion(String protocol, long clientVersion) 78 throws IOException { 79 return Foo0.versionID; 80 } 81 82 @SuppressWarnings("unchecked") 83 @Override getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)84 public ProtocolSignature getProtocolSignature(String protocol, 85 long clientVersion, int clientMethodsHash) throws IOException { 86 Class<? extends VersionedProtocol> inter; 87 try { 88 inter = (Class<? extends VersionedProtocol>)getClass(). 89 getGenericInterfaces()[0]; 90 } catch (Exception e) { 91 throw new IOException(e); 92 } 93 return ProtocolSignature.getProtocolSignature(clientMethodsHash, 94 getProtocolVersion(protocol, clientVersion), inter); 95 } 96 97 @Override ping()98 public String ping() { 99 return "Foo0"; 100 } 101 102 } 103 104 class Foo1Impl implements Foo1 { 105 106 @Override getProtocolVersion(String protocol, long clientVersion)107 public long getProtocolVersion(String protocol, long clientVersion) 108 throws IOException { 109 return Foo1.versionID; 110 } 111 112 @SuppressWarnings("unchecked") 113 @Override getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)114 public ProtocolSignature getProtocolSignature(String protocol, 115 long clientVersion, int clientMethodsHash) throws IOException { 116 Class<? extends VersionedProtocol> inter; 117 try { 118 inter = (Class<? extends VersionedProtocol>)getClass(). 119 getGenericInterfaces()[0]; 120 } catch (Exception e) { 121 throw new IOException(e); 122 } 123 return ProtocolSignature.getProtocolSignature(clientMethodsHash, 124 getProtocolVersion(protocol, clientVersion), inter); 125 } 126 127 @Override ping()128 public String ping() { 129 return "Foo1"; 130 } 131 132 @Override ping2()133 public String ping2() { 134 return "Foo1"; 135 136 } 137 138 } 139 140 141 class BarImpl implements Bar { 142 143 @Override getProtocolVersion(String protocol, long clientVersion)144 public long getProtocolVersion(String protocol, long clientVersion) 145 throws IOException { 146 return Bar.versionID; 147 } 148 149 @SuppressWarnings("unchecked") 150 @Override getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)151 public ProtocolSignature getProtocolSignature(String protocol, 152 long clientVersion, int clientMethodsHash) throws IOException { 153 Class<? extends VersionedProtocol> inter; 154 try { 155 inter = (Class<? extends VersionedProtocol>)getClass(). 156 getGenericInterfaces()[0]; 157 } catch (Exception e) { 158 throw new IOException(e); 159 } 160 return ProtocolSignature.getProtocolSignature(clientMethodsHash, 161 getProtocolVersion(protocol, clientVersion), inter); 162 } 163 164 @Override echo(int i)165 public int echo(int i) { 166 return i; 167 } 168 169 @Override hello()170 public void hello() { 171 172 173 } 174 } 175 @Before setUp()176 public void setUp() throws Exception { 177 // create a server with two handlers 178 server = new RPC.Builder(conf).setProtocol(Foo0.class) 179 .setInstance(new Foo0Impl()).setBindAddress(ADDRESS).setPort(0) 180 .setNumHandlers(2).setVerbose(false).build(); 181 server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); 182 server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); 183 server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); 184 185 186 // Add Protobuf server 187 // Create server side implementation 188 PBServerImpl pbServerImpl = 189 new PBServerImpl(); 190 BlockingService service = TestProtobufRpcProto 191 .newReflectiveBlockingService(pbServerImpl); 192 server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, 193 service); 194 server.start(); 195 addr = NetUtils.getConnectAddress(server); 196 } 197 198 @After tearDown()199 public void tearDown() throws Exception { 200 server.stop(); 201 } 202 203 @Test test1()204 public void test1() throws IOException { 205 ProtocolProxy<?> proxy; 206 proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf); 207 208 Foo0 foo0 = (Foo0)proxy.getProxy(); 209 Assert.assertEquals("Foo0", foo0.ping()); 210 211 212 proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf); 213 214 215 Foo1 foo1 = (Foo1)proxy.getProxy(); 216 Assert.assertEquals("Foo1", foo1.ping()); 217 Assert.assertEquals("Foo1", foo1.ping()); 218 219 220 proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf); 221 222 223 Bar bar = (Bar)proxy.getProxy(); 224 Assert.assertEquals(99, bar.echo(99)); 225 226 // Now test Mixin class method 227 228 Mixin mixin = bar; 229 mixin.hello(); 230 } 231 232 233 // Server does not implement the FooUnimplemented version of protocol Foo. 234 // See that calls to it fail. 235 @Test(expected=IOException.class) testNonExistingProtocol()236 public void testNonExistingProtocol() throws IOException { 237 ProtocolProxy<?> proxy; 238 proxy = RPC.getProtocolProxy(FooUnimplemented.class, 239 FooUnimplemented.versionID, addr, conf); 240 241 FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); 242 foo.ping(); 243 } 244 245 246 /** 247 * getProtocolVersion of an unimplemented version should return highest version 248 * Similarly getProtocolSignature should work. 249 * @throws IOException 250 */ 251 @Test testNonExistingProtocol2()252 public void testNonExistingProtocol2() throws IOException { 253 ProtocolProxy<?> proxy; 254 proxy = RPC.getProtocolProxy(FooUnimplemented.class, 255 FooUnimplemented.versionID, addr, conf); 256 257 FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); 258 Assert.assertEquals(Foo1.versionID, 259 foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), 260 FooUnimplemented.versionID)); 261 foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), 262 FooUnimplemented.versionID, 0); 263 } 264 265 @Test(expected=IOException.class) testIncorrectServerCreation()266 public void testIncorrectServerCreation() throws IOException { 267 new RPC.Builder(conf).setProtocol(Foo1.class).setInstance(new Foo0Impl()) 268 .setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false) 269 .build(); 270 } 271 272 // Now test a PB service - a server hosts both PB and Writable Rpcs. 273 @Test testPBService()274 public void testPBService() throws Exception { 275 // Set RPC engine to protobuf RPC engine 276 Configuration conf2 = new Configuration(); 277 RPC.setProtocolEngine(conf2, TestRpcService.class, 278 ProtobufRpcEngine.class); 279 TestRpcService client = RPC.getProxy(TestRpcService.class, 0, addr, conf2); 280 TestProtoBufRpc.testProtoBufRpc(client); 281 } 282 } 283