1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT; 21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY; 22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX; 23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT; 24 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY; 25 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT; 26 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY; 27 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY; 28 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT; 29 30 import java.io.IOException; 31 import java.lang.reflect.Constructor; 32 import java.lang.reflect.InvocationHandler; 33 import java.lang.reflect.Proxy; 34 import java.net.InetSocketAddress; 35 import java.net.URI; 36 import java.util.HashMap; 37 import java.util.Map; 38 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.atomic.AtomicBoolean; 40 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 import org.apache.hadoop.conf.Configuration; 44 import org.apache.hadoop.hdfs.DFSClient.Conf; 45 import org.apache.hadoop.hdfs.protocol.ClientProtocol; 46 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 47 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; 48 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; 49 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; 50 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB; 51 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; 52 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; 53 import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; 54 import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider; 55 import org.apache.hadoop.hdfs.server.namenode.NameNode; 56 import org.apache.hadoop.hdfs.server.namenode.SafeModeException; 57 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; 58 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; 59 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; 60 import org.apache.hadoop.io.Text; 61 import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider; 62 import org.apache.hadoop.io.retry.FailoverProxyProvider; 63 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; 64 import org.apache.hadoop.io.retry.RetryPolicies; 65 import org.apache.hadoop.io.retry.RetryPolicy; 66 import org.apache.hadoop.io.retry.RetryProxy; 67 import org.apache.hadoop.io.retry.RetryUtils; 68 import org.apache.hadoop.ipc.ProtobufRpcEngine; 69 import org.apache.hadoop.ipc.RPC; 70 import org.apache.hadoop.net.NetUtils; 71 import org.apache.hadoop.security.RefreshUserMappingsProtocol; 72 import org.apache.hadoop.security.SecurityUtil; 73 import org.apache.hadoop.security.UserGroupInformation; 74 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; 75 import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB; 76 import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB; 77 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB; 78 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; 79 import org.apache.hadoop.ipc.RefreshCallQueueProtocol; 80 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; 81 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB; 82 import org.apache.hadoop.tools.GetUserMappingsProtocol; 83 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB; 84 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; 85 86 import com.google.common.annotations.VisibleForTesting; 87 import com.google.common.base.Preconditions; 88 89 /** 90 * Create proxy objects to communicate with a remote NN. All remote access to an 91 * NN should be funneled through this class. Most of the time you'll want to use 92 * {@link NameNodeProxies#createProxy(Configuration, URI, Class)}, which will 93 * create either an HA- or non-HA-enabled client proxy as appropriate. 94 */ 95 public class NameNodeProxies { 96 97 private static final Log LOG = LogFactory.getLog(NameNodeProxies.class); 98 99 /** 100 * Wrapper for a client proxy as well as its associated service ID. 101 * This is simply used as a tuple-like return type for 102 * {@link NameNodeProxies#createProxy} and 103 * {@link NameNodeProxies#createNonHAProxy}. 104 */ 105 public static class ProxyAndInfo<PROXYTYPE> { 106 private final PROXYTYPE proxy; 107 private final Text dtService; 108 private final InetSocketAddress address; 109 ProxyAndInfo(PROXYTYPE proxy, Text dtService, InetSocketAddress address)110 public ProxyAndInfo(PROXYTYPE proxy, Text dtService, 111 InetSocketAddress address) { 112 this.proxy = proxy; 113 this.dtService = dtService; 114 this.address = address; 115 } 116 getProxy()117 public PROXYTYPE getProxy() { 118 return proxy; 119 } 120 getDelegationTokenService()121 public Text getDelegationTokenService() { 122 return dtService; 123 } 124 getAddress()125 public InetSocketAddress getAddress() { 126 return address; 127 } 128 } 129 130 /** 131 * Creates the namenode proxy with the passed protocol. This will handle 132 * creation of either HA- or non-HA-enabled proxy objects, depending upon 133 * if the provided URI is a configured logical URI. 134 * 135 * @param conf the configuration containing the required IPC 136 * properties, client failover configurations, etc. 137 * @param nameNodeUri the URI pointing either to a specific NameNode 138 * or to a logical nameservice. 139 * @param xface the IPC interface which should be created 140 * @return an object containing both the proxy and the associated 141 * delegation token service it corresponds to 142 * @throws IOException if there is an error creating the proxy 143 **/ 144 @SuppressWarnings("unchecked") createProxy(Configuration conf, URI nameNodeUri, Class<T> xface)145 public static <T> ProxyAndInfo<T> createProxy(Configuration conf, 146 URI nameNodeUri, Class<T> xface) throws IOException { 147 return createProxy(conf, nameNodeUri, xface, null); 148 } 149 150 /** 151 * Creates the namenode proxy with the passed protocol. This will handle 152 * creation of either HA- or non-HA-enabled proxy objects, depending upon 153 * if the provided URI is a configured logical URI. 154 * 155 * @param conf the configuration containing the required IPC 156 * properties, client failover configurations, etc. 157 * @param nameNodeUri the URI pointing either to a specific NameNode 158 * or to a logical nameservice. 159 * @param xface the IPC interface which should be created 160 * @param fallbackToSimpleAuth set to true or false during calls to indicate if 161 * a secure client falls back to simple auth 162 * @return an object containing both the proxy and the associated 163 * delegation token service it corresponds to 164 * @throws IOException if there is an error creating the proxy 165 **/ 166 @SuppressWarnings("unchecked") createProxy(Configuration conf, URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)167 public static <T> ProxyAndInfo<T> createProxy(Configuration conf, 168 URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth) 169 throws IOException { 170 AbstractNNFailoverProxyProvider<T> failoverProxyProvider = 171 createFailoverProxyProvider(conf, nameNodeUri, xface, true, 172 fallbackToSimpleAuth); 173 174 if (failoverProxyProvider == null) { 175 // Non-HA case 176 return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, 177 UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); 178 } else { 179 // HA case 180 Conf config = new Conf(conf); 181 T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, 182 RetryPolicies.failoverOnNetworkException( 183 RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, 184 config.maxRetryAttempts, config.failoverSleepBaseMillis, 185 config.failoverSleepMaxMillis)); 186 187 Text dtService; 188 if (failoverProxyProvider.useLogicalURI()) { 189 dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, 190 HdfsConstants.HDFS_URI_SCHEME); 191 } else { 192 dtService = SecurityUtil.buildTokenService( 193 NameNode.getAddress(nameNodeUri)); 194 } 195 return new ProxyAndInfo<T>(proxy, dtService, 196 NameNode.getAddress(nameNodeUri)); 197 } 198 } 199 200 /** 201 * Generate a dummy namenode proxy instance that utilizes our hacked 202 * {@link LossyRetryInvocationHandler}. Proxy instance generated using this 203 * method will proactively drop RPC responses. Currently this method only 204 * support HA setup. null will be returned if the given configuration is not 205 * for HA. 206 * 207 * @param config the configuration containing the required IPC 208 * properties, client failover configurations, etc. 209 * @param nameNodeUri the URI pointing either to a specific NameNode 210 * or to a logical nameservice. 211 * @param xface the IPC interface which should be created 212 * @param numResponseToDrop The number of responses to drop for each RPC call 213 * @param fallbackToSimpleAuth set to true or false during calls to indicate if 214 * a secure client falls back to simple auth 215 * @return an object containing both the proxy and the associated 216 * delegation token service it corresponds to. Will return null of the 217 * given configuration does not support HA. 218 * @throws IOException if there is an error creating the proxy 219 */ 220 @SuppressWarnings("unchecked") createProxyWithLossyRetryHandler( Configuration config, URI nameNodeUri, Class<T> xface, int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)221 public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler( 222 Configuration config, URI nameNodeUri, Class<T> xface, 223 int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth) 224 throws IOException { 225 Preconditions.checkArgument(numResponseToDrop > 0); 226 AbstractNNFailoverProxyProvider<T> failoverProxyProvider = 227 createFailoverProxyProvider(config, nameNodeUri, xface, true, 228 fallbackToSimpleAuth); 229 230 if (failoverProxyProvider != null) { // HA case 231 int delay = config.getInt( 232 DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, 233 DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); 234 int maxCap = config.getInt( 235 DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY, 236 DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT); 237 int maxFailoverAttempts = config.getInt( 238 DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 239 DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); 240 int maxRetryAttempts = config.getInt( 241 DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 242 DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT); 243 InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>( 244 numResponseToDrop, failoverProxyProvider, 245 RetryPolicies.failoverOnNetworkException( 246 RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, 247 Math.max(numResponseToDrop + 1, maxRetryAttempts), delay, 248 maxCap)); 249 250 T proxy = (T) Proxy.newProxyInstance( 251 failoverProxyProvider.getInterface().getClassLoader(), 252 new Class[] { xface }, dummyHandler); 253 Text dtService; 254 if (failoverProxyProvider.useLogicalURI()) { 255 dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, 256 HdfsConstants.HDFS_URI_SCHEME); 257 } else { 258 dtService = SecurityUtil.buildTokenService( 259 NameNode.getAddress(nameNodeUri)); 260 } 261 return new ProxyAndInfo<T>(proxy, dtService, 262 NameNode.getAddress(nameNodeUri)); 263 } else { 264 LOG.warn("Currently creating proxy using " + 265 "LossyRetryInvocationHandler requires NN HA setup"); 266 return null; 267 } 268 } 269 270 /** 271 * Creates an explicitly non-HA-enabled proxy object. Most of the time you 272 * don't want to use this, and should instead use {@link NameNodeProxies#createProxy}. 273 * 274 * @param conf the configuration object 275 * @param nnAddr address of the remote NN to connect to 276 * @param xface the IPC interface which should be created 277 * @param ugi the user who is making the calls on the proxy object 278 * @param withRetries certain interfaces have a non-standard retry policy 279 * @return an object containing both the proxy and the associated 280 * delegation token service it corresponds to 281 * @throws IOException 282 */ 283 @SuppressWarnings("unchecked") createNonHAProxy( Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries)284 public static <T> ProxyAndInfo<T> createNonHAProxy( 285 Configuration conf, InetSocketAddress nnAddr, Class<T> xface, 286 UserGroupInformation ugi, boolean withRetries) throws IOException { 287 return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null); 288 } 289 290 /** 291 * Creates an explicitly non-HA-enabled proxy object. Most of the time you 292 * don't want to use this, and should instead use {@link NameNodeProxies#createProxy}. 293 * 294 * @param conf the configuration object 295 * @param nnAddr address of the remote NN to connect to 296 * @param xface the IPC interface which should be created 297 * @param ugi the user who is making the calls on the proxy object 298 * @param withRetries certain interfaces have a non-standard retry policy 299 * @param fallbackToSimpleAuth - set to true or false during this method to 300 * indicate if a secure client falls back to simple auth 301 * @return an object containing both the proxy and the associated 302 * delegation token service it corresponds to 303 * @throws IOException 304 */ 305 @SuppressWarnings("unchecked") createNonHAProxy( Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth)306 public static <T> ProxyAndInfo<T> createNonHAProxy( 307 Configuration conf, InetSocketAddress nnAddr, Class<T> xface, 308 UserGroupInformation ugi, boolean withRetries, 309 AtomicBoolean fallbackToSimpleAuth) throws IOException { 310 Text dtService = SecurityUtil.buildTokenService(nnAddr); 311 312 T proxy; 313 if (xface == ClientProtocol.class) { 314 proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi, 315 withRetries, fallbackToSimpleAuth); 316 } else if (xface == JournalProtocol.class) { 317 proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi); 318 } else if (xface == NamenodeProtocol.class) { 319 proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi, 320 withRetries); 321 } else if (xface == GetUserMappingsProtocol.class) { 322 proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi); 323 } else if (xface == RefreshUserMappingsProtocol.class) { 324 proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi); 325 } else if (xface == RefreshAuthorizationPolicyProtocol.class) { 326 proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr, 327 conf, ugi); 328 } else if (xface == RefreshCallQueueProtocol.class) { 329 proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi); 330 } else { 331 String message = "Unsupported protocol found when creating the proxy " + 332 "connection to NameNode: " + 333 ((xface != null) ? xface.getClass().getName() : "null"); 334 LOG.error(message); 335 throw new IllegalStateException(message); 336 } 337 338 return new ProxyAndInfo<T>(proxy, dtService, nnAddr); 339 } 340 createNNProxyWithJournalProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi)341 private static JournalProtocol createNNProxyWithJournalProtocol( 342 InetSocketAddress address, Configuration conf, UserGroupInformation ugi) 343 throws IOException { 344 JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address, 345 conf, ugi, JournalProtocolPB.class); 346 return new JournalProtocolTranslatorPB(proxy); 347 } 348 349 private static RefreshAuthorizationPolicyProtocol createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi)350 createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address, 351 Configuration conf, UserGroupInformation ugi) throws IOException { 352 RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB) 353 createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class); 354 return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy); 355 } 356 357 private static RefreshUserMappingsProtocol createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi)358 createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address, 359 Configuration conf, UserGroupInformation ugi) throws IOException { 360 RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB) 361 createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class); 362 return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy); 363 } 364 365 private static RefreshCallQueueProtocol createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi)366 createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address, 367 Configuration conf, UserGroupInformation ugi) throws IOException { 368 RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB) 369 createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class); 370 return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy); 371 } 372 createNNProxyWithGetUserMappingsProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi)373 private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol( 374 InetSocketAddress address, Configuration conf, UserGroupInformation ugi) 375 throws IOException { 376 GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB) 377 createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class); 378 return new GetUserMappingsProtocolClientSideTranslatorPB(proxy); 379 } 380 createNNProxyWithNamenodeProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries)381 private static NamenodeProtocol createNNProxyWithNamenodeProtocol( 382 InetSocketAddress address, Configuration conf, UserGroupInformation ugi, 383 boolean withRetries) throws IOException { 384 NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy( 385 address, conf, ugi, NamenodeProtocolPB.class); 386 if (withRetries) { // create the proxy with retries 387 RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200, 388 TimeUnit.MILLISECONDS); 389 Map<String, RetryPolicy> methodNameToPolicyMap 390 = new HashMap<String, RetryPolicy>(); 391 methodNameToPolicyMap.put("getBlocks", timeoutPolicy); 392 methodNameToPolicyMap.put("getAccessKeys", timeoutPolicy); 393 NamenodeProtocol translatorProxy = 394 new NamenodeProtocolTranslatorPB(proxy); 395 return (NamenodeProtocol) RetryProxy.create( 396 NamenodeProtocol.class, translatorProxy, methodNameToPolicyMap); 397 } else { 398 return new NamenodeProtocolTranslatorPB(proxy); 399 } 400 } 401 createNNProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth)402 private static ClientProtocol createNNProxyWithClientProtocol( 403 InetSocketAddress address, Configuration conf, UserGroupInformation ugi, 404 boolean withRetries, AtomicBoolean fallbackToSimpleAuth) 405 throws IOException { 406 RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); 407 408 final RetryPolicy defaultPolicy = 409 RetryUtils.getDefaultRetryPolicy( 410 conf, 411 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 412 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 413 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY, 414 DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT, 415 SafeModeException.class); 416 417 final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); 418 ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( 419 ClientNamenodeProtocolPB.class, version, address, ugi, conf, 420 NetUtils.getDefaultSocketFactory(conf), 421 org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, 422 fallbackToSimpleAuth).getProxy(); 423 424 if (withRetries) { // create the proxy with retries 425 426 Map<String, RetryPolicy> methodNameToPolicyMap 427 = new HashMap<String, RetryPolicy>(); 428 429 ClientProtocol translatorProxy = 430 new ClientNamenodeProtocolTranslatorPB(proxy); 431 return (ClientProtocol) RetryProxy.create( 432 ClientProtocol.class, 433 new DefaultFailoverProxyProvider<ClientProtocol>( 434 ClientProtocol.class, translatorProxy), 435 methodNameToPolicyMap, 436 defaultPolicy); 437 } else { 438 return new ClientNamenodeProtocolTranslatorPB(proxy); 439 } 440 } 441 createNameNodeProxy(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, Class<?> xface)442 private static Object createNameNodeProxy(InetSocketAddress address, 443 Configuration conf, UserGroupInformation ugi, Class<?> xface) 444 throws IOException { 445 RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class); 446 Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address, 447 ugi, conf, NetUtils.getDefaultSocketFactory(conf)); 448 return proxy; 449 } 450 451 /** Gets the configured Failover proxy provider's class */ 452 @VisibleForTesting getFailoverProxyProviderClass( Configuration conf, URI nameNodeUri)453 public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass( 454 Configuration conf, URI nameNodeUri) throws IOException { 455 if (nameNodeUri == null) { 456 return null; 457 } 458 String host = nameNodeUri.getHost(); 459 460 String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." 461 + host; 462 try { 463 @SuppressWarnings("unchecked") 464 Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf 465 .getClass(configKey, null, FailoverProxyProvider.class); 466 return ret; 467 } catch (RuntimeException e) { 468 if (e.getCause() instanceof ClassNotFoundException) { 469 throw new IOException("Could not load failover proxy provider class " 470 + conf.get(configKey) + " which is configured for authority " 471 + nameNodeUri, e); 472 } else { 473 throw e; 474 } 475 } 476 } 477 478 /** Creates the Failover proxy provider instance*/ 479 @VisibleForTesting createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth)480 public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( 481 Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, 482 AtomicBoolean fallbackToSimpleAuth) throws IOException { 483 Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; 484 AbstractNNFailoverProxyProvider<T> providerNN; 485 Preconditions.checkArgument( 486 xface.isAssignableFrom(NamenodeProtocols.class), 487 "Interface %s is not a NameNode protocol", xface); 488 try { 489 // Obtain the class of the proxy provider 490 failoverProxyProviderClass = getFailoverProxyProviderClass(conf, 491 nameNodeUri); 492 if (failoverProxyProviderClass == null) { 493 return null; 494 } 495 // Create a proxy provider instance. 496 Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass 497 .getConstructor(Configuration.class, URI.class, Class.class); 498 FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, 499 xface); 500 501 // If the proxy provider is of an old implementation, wrap it. 502 if (!(provider instanceof AbstractNNFailoverProxyProvider)) { 503 providerNN = new WrappedFailoverProxyProvider<T>(provider); 504 } else { 505 providerNN = (AbstractNNFailoverProxyProvider<T>)provider; 506 } 507 } catch (Exception e) { 508 String message = "Couldn't create proxy provider " + failoverProxyProviderClass; 509 if (LOG.isDebugEnabled()) { 510 LOG.debug(message, e); 511 } 512 if (e.getCause() instanceof IOException) { 513 throw (IOException) e.getCause(); 514 } else { 515 throw new IOException(message, e); 516 } 517 } 518 519 // Check the port in the URI, if it is logical. 520 if (checkPort && providerNN.useLogicalURI()) { 521 int port = nameNodeUri.getPort(); 522 if (port > 0 && port != NameNode.DEFAULT_PORT) { 523 // Throwing here without any cleanup is fine since we have not 524 // actually created the underlying proxies yet. 525 throw new IOException("Port " + port + " specified in URI " 526 + nameNodeUri + " but host '" + nameNodeUri.getHost() 527 + "' is a logical (HA) namenode" 528 + " and does not use port information."); 529 } 530 } 531 providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); 532 return providerNN; 533 } 534 535 } 536