1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
24 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
25 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
26 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
27 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
28 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT;
29 
30 import java.io.IOException;
31 import java.lang.reflect.Constructor;
32 import java.lang.reflect.InvocationHandler;
33 import java.lang.reflect.Proxy;
34 import java.net.InetSocketAddress;
35 import java.net.URI;
36 import java.util.HashMap;
37 import java.util.Map;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.hdfs.DFSClient.Conf;
45 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
46 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
47 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
48 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
49 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
50 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
51 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
52 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
53 import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
54 import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider;
55 import org.apache.hadoop.hdfs.server.namenode.NameNode;
56 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
57 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
58 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
59 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
60 import org.apache.hadoop.io.Text;
61 import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
62 import org.apache.hadoop.io.retry.FailoverProxyProvider;
63 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
64 import org.apache.hadoop.io.retry.RetryPolicies;
65 import org.apache.hadoop.io.retry.RetryPolicy;
66 import org.apache.hadoop.io.retry.RetryProxy;
67 import org.apache.hadoop.io.retry.RetryUtils;
68 import org.apache.hadoop.ipc.ProtobufRpcEngine;
69 import org.apache.hadoop.ipc.RPC;
70 import org.apache.hadoop.net.NetUtils;
71 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
72 import org.apache.hadoop.security.SecurityUtil;
73 import org.apache.hadoop.security.UserGroupInformation;
74 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
75 import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
76 import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
77 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
78 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
79 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
80 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
81 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
82 import org.apache.hadoop.tools.GetUserMappingsProtocol;
83 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
84 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
85 
86 import com.google.common.annotations.VisibleForTesting;
87 import com.google.common.base.Preconditions;
88 
89 /**
90  * Create proxy objects to communicate with a remote NN. All remote access to an
91  * NN should be funneled through this class. Most of the time you'll want to use
92  * {@link NameNodeProxies#createProxy(Configuration, URI, Class)}, which will
93  * create either an HA- or non-HA-enabled client proxy as appropriate.
94  */
95 public class NameNodeProxies {
96 
97   private static final Log LOG = LogFactory.getLog(NameNodeProxies.class);
98 
99   /**
100    * Wrapper for a client proxy as well as its associated service ID.
101    * This is simply used as a tuple-like return type for
102    * {@link NameNodeProxies#createProxy} and
103    * {@link NameNodeProxies#createNonHAProxy}.
104    */
105   public static class ProxyAndInfo<PROXYTYPE> {
106     private final PROXYTYPE proxy;
107     private final Text dtService;
108     private final InetSocketAddress address;
109 
ProxyAndInfo(PROXYTYPE proxy, Text dtService, InetSocketAddress address)110     public ProxyAndInfo(PROXYTYPE proxy, Text dtService,
111         InetSocketAddress address) {
112       this.proxy = proxy;
113       this.dtService = dtService;
114       this.address = address;
115     }
116 
getProxy()117     public PROXYTYPE getProxy() {
118       return proxy;
119     }
120 
getDelegationTokenService()121     public Text getDelegationTokenService() {
122       return dtService;
123     }
124 
getAddress()125     public InetSocketAddress getAddress() {
126       return address;
127     }
128   }
129 
130   /**
131    * Creates the namenode proxy with the passed protocol. This will handle
132    * creation of either HA- or non-HA-enabled proxy objects, depending upon
133    * if the provided URI is a configured logical URI.
134    *
135    * @param conf the configuration containing the required IPC
136    *        properties, client failover configurations, etc.
137    * @param nameNodeUri the URI pointing either to a specific NameNode
138    *        or to a logical nameservice.
139    * @param xface the IPC interface which should be created
140    * @return an object containing both the proxy and the associated
141    *         delegation token service it corresponds to
142    * @throws IOException if there is an error creating the proxy
143    **/
144   @SuppressWarnings("unchecked")
createProxy(Configuration conf, URI nameNodeUri, Class<T> xface)145   public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
146       URI nameNodeUri, Class<T> xface) throws IOException {
147     return createProxy(conf, nameNodeUri, xface, null);
148   }
149 
150   /**
151    * Creates the namenode proxy with the passed protocol. This will handle
152    * creation of either HA- or non-HA-enabled proxy objects, depending upon
153    * if the provided URI is a configured logical URI.
154    *
155    * @param conf the configuration containing the required IPC
156    *        properties, client failover configurations, etc.
157    * @param nameNodeUri the URI pointing either to a specific NameNode
158    *        or to a logical nameservice.
159    * @param xface the IPC interface which should be created
160    * @param fallbackToSimpleAuth set to true or false during calls to indicate if
161    *   a secure client falls back to simple auth
162    * @return an object containing both the proxy and the associated
163    *         delegation token service it corresponds to
164    * @throws IOException if there is an error creating the proxy
165    **/
166   @SuppressWarnings("unchecked")
createProxy(Configuration conf, URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)167   public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
168       URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
169       throws IOException {
170     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
171         createFailoverProxyProvider(conf, nameNodeUri, xface, true,
172           fallbackToSimpleAuth);
173 
174     if (failoverProxyProvider == null) {
175       // Non-HA case
176       return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
177           UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
178     } else {
179       // HA case
180       Conf config = new Conf(conf);
181       T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
182           RetryPolicies.failoverOnNetworkException(
183               RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
184               config.maxRetryAttempts, config.failoverSleepBaseMillis,
185               config.failoverSleepMaxMillis));
186 
187       Text dtService;
188       if (failoverProxyProvider.useLogicalURI()) {
189         dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
190             HdfsConstants.HDFS_URI_SCHEME);
191       } else {
192         dtService = SecurityUtil.buildTokenService(
193             NameNode.getAddress(nameNodeUri));
194       }
195       return new ProxyAndInfo<T>(proxy, dtService,
196           NameNode.getAddress(nameNodeUri));
197     }
198   }
199 
200   /**
201    * Generate a dummy namenode proxy instance that utilizes our hacked
202    * {@link LossyRetryInvocationHandler}. Proxy instance generated using this
203    * method will proactively drop RPC responses. Currently this method only
204    * support HA setup. null will be returned if the given configuration is not
205    * for HA.
206    *
207    * @param config the configuration containing the required IPC
208    *        properties, client failover configurations, etc.
209    * @param nameNodeUri the URI pointing either to a specific NameNode
210    *        or to a logical nameservice.
211    * @param xface the IPC interface which should be created
212    * @param numResponseToDrop The number of responses to drop for each RPC call
213    * @param fallbackToSimpleAuth set to true or false during calls to indicate if
214    *   a secure client falls back to simple auth
215    * @return an object containing both the proxy and the associated
216    *         delegation token service it corresponds to. Will return null of the
217    *         given configuration does not support HA.
218    * @throws IOException if there is an error creating the proxy
219    */
220   @SuppressWarnings("unchecked")
createProxyWithLossyRetryHandler( Configuration config, URI nameNodeUri, Class<T> xface, int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)221   public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
222       Configuration config, URI nameNodeUri, Class<T> xface,
223       int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
224       throws IOException {
225     Preconditions.checkArgument(numResponseToDrop > 0);
226     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
227         createFailoverProxyProvider(config, nameNodeUri, xface, true,
228           fallbackToSimpleAuth);
229 
230     if (failoverProxyProvider != null) { // HA case
231       int delay = config.getInt(
232           DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
233           DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
234       int maxCap = config.getInt(
235           DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
236           DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
237       int maxFailoverAttempts = config.getInt(
238           DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
239           DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
240       int maxRetryAttempts = config.getInt(
241           DFS_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
242           DFS_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
243       InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
244               numResponseToDrop, failoverProxyProvider,
245               RetryPolicies.failoverOnNetworkException(
246                   RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
247                   Math.max(numResponseToDrop + 1, maxRetryAttempts), delay,
248                   maxCap));
249 
250       T proxy = (T) Proxy.newProxyInstance(
251           failoverProxyProvider.getInterface().getClassLoader(),
252           new Class[] { xface }, dummyHandler);
253       Text dtService;
254       if (failoverProxyProvider.useLogicalURI()) {
255         dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
256             HdfsConstants.HDFS_URI_SCHEME);
257       } else {
258         dtService = SecurityUtil.buildTokenService(
259             NameNode.getAddress(nameNodeUri));
260       }
261       return new ProxyAndInfo<T>(proxy, dtService,
262           NameNode.getAddress(nameNodeUri));
263     } else {
264       LOG.warn("Currently creating proxy using " +
265       		"LossyRetryInvocationHandler requires NN HA setup");
266       return null;
267     }
268   }
269 
270   /**
271    * Creates an explicitly non-HA-enabled proxy object. Most of the time you
272    * don't want to use this, and should instead use {@link NameNodeProxies#createProxy}.
273    *
274    * @param conf the configuration object
275    * @param nnAddr address of the remote NN to connect to
276    * @param xface the IPC interface which should be created
277    * @param ugi the user who is making the calls on the proxy object
278    * @param withRetries certain interfaces have a non-standard retry policy
279    * @return an object containing both the proxy and the associated
280    *         delegation token service it corresponds to
281    * @throws IOException
282    */
283   @SuppressWarnings("unchecked")
createNonHAProxy( Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries)284   public static <T> ProxyAndInfo<T> createNonHAProxy(
285       Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
286       UserGroupInformation ugi, boolean withRetries) throws IOException {
287     return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
288   }
289 
290   /**
291    * Creates an explicitly non-HA-enabled proxy object. Most of the time you
292    * don't want to use this, and should instead use {@link NameNodeProxies#createProxy}.
293    *
294    * @param conf the configuration object
295    * @param nnAddr address of the remote NN to connect to
296    * @param xface the IPC interface which should be created
297    * @param ugi the user who is making the calls on the proxy object
298    * @param withRetries certain interfaces have a non-standard retry policy
299    * @param fallbackToSimpleAuth - set to true or false during this method to
300    *   indicate if a secure client falls back to simple auth
301    * @return an object containing both the proxy and the associated
302    *         delegation token service it corresponds to
303    * @throws IOException
304    */
305   @SuppressWarnings("unchecked")
createNonHAProxy( Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth)306   public static <T> ProxyAndInfo<T> createNonHAProxy(
307       Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
308       UserGroupInformation ugi, boolean withRetries,
309       AtomicBoolean fallbackToSimpleAuth) throws IOException {
310     Text dtService = SecurityUtil.buildTokenService(nnAddr);
311 
312     T proxy;
313     if (xface == ClientProtocol.class) {
314       proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
315           withRetries, fallbackToSimpleAuth);
316     } else if (xface == JournalProtocol.class) {
317       proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
318     } else if (xface == NamenodeProtocol.class) {
319       proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi,
320           withRetries);
321     } else if (xface == GetUserMappingsProtocol.class) {
322       proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
323     } else if (xface == RefreshUserMappingsProtocol.class) {
324       proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
325     } else if (xface == RefreshAuthorizationPolicyProtocol.class) {
326       proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
327           conf, ugi);
328     } else if (xface == RefreshCallQueueProtocol.class) {
329       proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
330     } else {
331       String message = "Unsupported protocol found when creating the proxy " +
332           "connection to NameNode: " +
333           ((xface != null) ? xface.getClass().getName() : "null");
334       LOG.error(message);
335       throw new IllegalStateException(message);
336     }
337 
338     return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
339   }
340 
createNNProxyWithJournalProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi)341   private static JournalProtocol createNNProxyWithJournalProtocol(
342       InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
343       throws IOException {
344     JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address,
345         conf, ugi, JournalProtocolPB.class);
346     return new JournalProtocolTranslatorPB(proxy);
347   }
348 
349   private static RefreshAuthorizationPolicyProtocol
createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi)350       createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address,
351           Configuration conf, UserGroupInformation ugi) throws IOException {
352     RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB)
353         createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class);
354     return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
355   }
356 
357   private static RefreshUserMappingsProtocol
createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi)358       createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address,
359           Configuration conf, UserGroupInformation ugi) throws IOException {
360     RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB)
361         createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class);
362     return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
363   }
364 
365   private static RefreshCallQueueProtocol
createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi)366       createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address,
367           Configuration conf, UserGroupInformation ugi) throws IOException {
368     RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)
369         createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class);
370     return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
371   }
372 
createNNProxyWithGetUserMappingsProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi)373   private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(
374       InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
375       throws IOException {
376     GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)
377         createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class);
378     return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
379   }
380 
createNNProxyWithNamenodeProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries)381   private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
382       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
383       boolean withRetries) throws IOException {
384     NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
385         address, conf, ugi, NamenodeProtocolPB.class);
386     if (withRetries) { // create the proxy with retries
387       RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
388               TimeUnit.MILLISECONDS);
389       Map<String, RetryPolicy> methodNameToPolicyMap
390            = new HashMap<String, RetryPolicy>();
391       methodNameToPolicyMap.put("getBlocks", timeoutPolicy);
392       methodNameToPolicyMap.put("getAccessKeys", timeoutPolicy);
393       NamenodeProtocol translatorProxy =
394           new NamenodeProtocolTranslatorPB(proxy);
395       return (NamenodeProtocol) RetryProxy.create(
396           NamenodeProtocol.class, translatorProxy, methodNameToPolicyMap);
397     } else {
398       return new NamenodeProtocolTranslatorPB(proxy);
399     }
400   }
401 
createNNProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth)402   private static ClientProtocol createNNProxyWithClientProtocol(
403       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
404       boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
405       throws IOException {
406     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
407 
408     final RetryPolicy defaultPolicy =
409         RetryUtils.getDefaultRetryPolicy(
410             conf,
411             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
412             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
413             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
414             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
415             SafeModeException.class);
416 
417     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
418     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
419         ClientNamenodeProtocolPB.class, version, address, ugi, conf,
420         NetUtils.getDefaultSocketFactory(conf),
421         org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
422         fallbackToSimpleAuth).getProxy();
423 
424     if (withRetries) { // create the proxy with retries
425 
426       Map<String, RetryPolicy> methodNameToPolicyMap
427                  = new HashMap<String, RetryPolicy>();
428 
429       ClientProtocol translatorProxy =
430         new ClientNamenodeProtocolTranslatorPB(proxy);
431       return (ClientProtocol) RetryProxy.create(
432           ClientProtocol.class,
433           new DefaultFailoverProxyProvider<ClientProtocol>(
434               ClientProtocol.class, translatorProxy),
435           methodNameToPolicyMap,
436           defaultPolicy);
437     } else {
438       return new ClientNamenodeProtocolTranslatorPB(proxy);
439     }
440   }
441 
createNameNodeProxy(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, Class<?> xface)442   private static Object createNameNodeProxy(InetSocketAddress address,
443       Configuration conf, UserGroupInformation ugi, Class<?> xface)
444       throws IOException {
445     RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
446     Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
447         ugi, conf, NetUtils.getDefaultSocketFactory(conf));
448     return proxy;
449   }
450 
451   /** Gets the configured Failover proxy provider's class */
452   @VisibleForTesting
getFailoverProxyProviderClass( Configuration conf, URI nameNodeUri)453   public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
454       Configuration conf, URI nameNodeUri) throws IOException {
455     if (nameNodeUri == null) {
456       return null;
457     }
458     String host = nameNodeUri.getHost();
459 
460     String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "."
461         + host;
462     try {
463       @SuppressWarnings("unchecked")
464       Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf
465           .getClass(configKey, null, FailoverProxyProvider.class);
466       return ret;
467     } catch (RuntimeException e) {
468       if (e.getCause() instanceof ClassNotFoundException) {
469         throw new IOException("Could not load failover proxy provider class "
470             + conf.get(configKey) + " which is configured for authority "
471             + nameNodeUri, e);
472       } else {
473         throw e;
474       }
475     }
476   }
477 
478   /** Creates the Failover proxy provider instance*/
479   @VisibleForTesting
createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth)480   public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
481       Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
482       AtomicBoolean fallbackToSimpleAuth) throws IOException {
483     Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
484     AbstractNNFailoverProxyProvider<T> providerNN;
485     Preconditions.checkArgument(
486         xface.isAssignableFrom(NamenodeProtocols.class),
487         "Interface %s is not a NameNode protocol", xface);
488     try {
489       // Obtain the class of the proxy provider
490       failoverProxyProviderClass = getFailoverProxyProviderClass(conf,
491           nameNodeUri);
492       if (failoverProxyProviderClass == null) {
493         return null;
494       }
495       // Create a proxy provider instance.
496       Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
497           .getConstructor(Configuration.class, URI.class, Class.class);
498       FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
499           xface);
500 
501       // If the proxy provider is of an old implementation, wrap it.
502       if (!(provider instanceof AbstractNNFailoverProxyProvider)) {
503         providerNN = new WrappedFailoverProxyProvider<T>(provider);
504       } else {
505         providerNN = (AbstractNNFailoverProxyProvider<T>)provider;
506       }
507     } catch (Exception e) {
508       String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
509       if (LOG.isDebugEnabled()) {
510         LOG.debug(message, e);
511       }
512       if (e.getCause() instanceof IOException) {
513         throw (IOException) e.getCause();
514       } else {
515         throw new IOException(message, e);
516       }
517     }
518 
519     // Check the port in the URI, if it is logical.
520     if (checkPort && providerNN.useLogicalURI()) {
521       int port = nameNodeUri.getPort();
522       if (port > 0 && port != NameNode.DEFAULT_PORT) {
523         // Throwing here without any cleanup is fine since we have not
524         // actually created the underlying proxies yet.
525         throw new IOException("Port " + port + " specified in URI "
526             + nameNodeUri + " but host '" + nameNodeUri.getHost()
527             + "' is a logical (HA) namenode"
528             + " and does not use port information.");
529       }
530     }
531     providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
532     return providerNN;
533   }
534 
535 }
536