1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.namenode;
19 
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Joiner;
22 import com.google.common.base.Preconditions;
23 import com.google.common.collect.Lists;
24 import org.apache.hadoop.HadoopIllegalArgumentException;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Trash;
29 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
30 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
31 import org.apache.hadoop.ha.HAServiceStatus;
32 import org.apache.hadoop.ha.HealthCheckFailedException;
33 import org.apache.hadoop.ha.ServiceFailedException;
34 import org.apache.hadoop.hdfs.DFSConfigKeys;
35 import org.apache.hadoop.hdfs.DFSUtil;
36 import org.apache.hadoop.hdfs.HAUtil;
37 import org.apache.hadoop.hdfs.HdfsConfiguration;
38 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
39 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
40 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
41 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
42 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
43 import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
44 import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
45 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
46 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
47 import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
48 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
49 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
50 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgressMetrics;
51 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
52 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
53 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
54 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
55 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
56 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
57 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
58 import org.apache.hadoop.ipc.Server;
59 import org.apache.hadoop.ipc.StandbyException;
60 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
61 import org.apache.hadoop.metrics2.util.MBeans;
62 import org.apache.hadoop.net.NetUtils;
63 import org.apache.hadoop.security.AccessControlException;
64 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
65 import org.apache.hadoop.security.SecurityUtil;
66 import org.apache.hadoop.security.UserGroupInformation;
67 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
68 import org.apache.hadoop.tools.GetUserMappingsProtocol;
69 import org.apache.hadoop.tracing.SpanReceiverHost;
70 import org.apache.hadoop.tracing.TraceAdminProtocol;
71 import org.apache.hadoop.util.ExitUtil.ExitException;
72 import org.apache.hadoop.util.JvmPauseMonitor;
73 import org.apache.hadoop.util.ServicePlugin;
74 import org.apache.hadoop.util.StringUtils;
75 import org.apache.log4j.LogManager;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78 
79 import javax.management.ObjectName;
80 
81 import java.io.IOException;
82 import java.io.PrintStream;
83 import java.net.InetSocketAddress;
84 import java.net.URI;
85 import java.security.PrivilegedExceptionAction;
86 import java.util.ArrayList;
87 import java.util.Arrays;
88 import java.util.Collection;
89 import java.util.List;
90 import java.util.concurrent.atomic.AtomicBoolean;
91 
92 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
93 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
94 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
95 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT;
96 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY;
97 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY;
98 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
99 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY;
100 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY;
101 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
102 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
103 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY;
104 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY;
105 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY;
106 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
107 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
108 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_BIND_HOST_KEY;
109 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
110 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
111 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_BIND_HOST_KEY;
112 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY;
113 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
114 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
115 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
116 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PLUGINS_KEY;
117 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
118 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
119 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY;
120 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
121 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
122 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
123 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
124 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY;
125 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT;
126 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY;
127 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
128 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY;
129 import static org.apache.hadoop.hdfs.DFSConfigKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
130 import static org.apache.hadoop.util.ExitUtil.terminate;
131 import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
132 
133 /**********************************************************
134  * NameNode serves as both directory namespace manager and
135  * "inode table" for the Hadoop DFS.  There is a single NameNode
136  * running in any DFS deployment.  (Well, except when there
137  * is a second backup/failover NameNode, or when using federated NameNodes.)
138  *
139  * The NameNode controls two critical tables:
140  *   1)  filename->blocksequence (namespace)
141  *   2)  block->machinelist ("inodes")
142  *
143  * The first table is stored on disk and is very precious.
144  * The second table is rebuilt every time the NameNode comes up.
145  *
146  * 'NameNode' refers to both this class as well as the 'NameNode server'.
147  * The 'FSNamesystem' class actually performs most of the filesystem
148  * management.  The majority of the 'NameNode' class itself is concerned
149  * with exposing the IPC interface and the HTTP server to the outside world,
150  * plus some configuration management.
151  *
152  * NameNode implements the
153  * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} interface, which
154  * allows clients to ask for DFS services.
155  * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} is not designed for
156  * direct use by authors of DFS client code.  End-users should instead use the
157  * {@link org.apache.hadoop.fs.FileSystem} class.
158  *
159  * NameNode also implements the
160  * {@link org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol} interface,
161  * used by DataNodes that actually store DFS data blocks.  These
162  * methods are invoked repeatedly and automatically by all the
163  * DataNodes in a DFS deployment.
164  *
165  * NameNode also implements the
166  * {@link org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol} interface,
167  * used by secondary namenodes or rebalancing processes to get partial
168  * NameNode state, for example partial blocksMap etc.
169  **********************************************************/
170 @InterfaceAudience.Private
171 public class NameNode implements NameNodeStatusMXBean {
172   static{
HdfsConfiguration.init()173     HdfsConfiguration.init();
174   }
175 
176   /**
177    * Categories of operations supported by the namenode.
178    */
179   public static enum OperationCategory {
180     /** Operations that are state agnostic */
181     UNCHECKED,
182     /** Read operation that does not change the namespace state */
183     READ,
184     /** Write operation that changes the namespace state */
185     WRITE,
186     /** Operations related to checkpointing */
187     CHECKPOINT,
188     /** Operations related to {@link JournalProtocol} */
189     JOURNAL
190   }
191 
192   /**
193    * HDFS configuration can have three types of parameters:
194    * <ol>
195    * <li>Parameters that are common for all the name services in the cluster.</li>
196    * <li>Parameters that are specific to a name service. These keys are suffixed
197    * with nameserviceId in the configuration. For example,
198    * "dfs.namenode.rpc-address.nameservice1".</li>
199    * <li>Parameters that are specific to a single name node. These keys are suffixed
200    * with nameserviceId and namenodeId in the configuration. for example,
201    * "dfs.namenode.rpc-address.nameservice1.namenode1"</li>
202    * </ol>
203    *
204    * In the latter cases, operators may specify the configuration without
205    * any suffix, with a nameservice suffix, or with a nameservice and namenode
206    * suffix. The more specific suffix will take precedence.
207    *
208    * These keys are specific to a given namenode, and thus may be configured
209    * globally, for a nameservice, or for a specific namenode within a nameservice.
210    */
211   public static final String[] NAMENODE_SPECIFIC_KEYS = {
212     DFS_NAMENODE_RPC_ADDRESS_KEY,
213     DFS_NAMENODE_RPC_BIND_HOST_KEY,
214     DFS_NAMENODE_NAME_DIR_KEY,
215     DFS_NAMENODE_EDITS_DIR_KEY,
216     DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
217     DFS_NAMENODE_CHECKPOINT_DIR_KEY,
218     DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
219     DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
220     DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
221     DFS_NAMENODE_HTTP_ADDRESS_KEY,
222     DFS_NAMENODE_HTTPS_ADDRESS_KEY,
223     DFS_NAMENODE_HTTP_BIND_HOST_KEY,
224     DFS_NAMENODE_HTTPS_BIND_HOST_KEY,
225     DFS_NAMENODE_KEYTAB_FILE_KEY,
226     DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
227     DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY,
228     DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY,
229     DFS_NAMENODE_BACKUP_ADDRESS_KEY,
230     DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
231     DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY,
232     DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
233     DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
234     DFS_HA_FENCE_METHODS_KEY,
235     DFS_HA_ZKFC_PORT_KEY,
236     DFS_HA_FENCE_METHODS_KEY
237   };
238 
239   /**
240    * @see #NAMENODE_SPECIFIC_KEYS
241    * These keys are specific to a nameservice, but may not be overridden
242    * for a specific namenode.
243    */
244   public static final String[] NAMESERVICE_SPECIFIC_KEYS = {
245     DFS_HA_AUTO_FAILOVER_ENABLED_KEY
246   };
247 
248   private static final String USAGE = "Usage: java NameNode ["
249       + StartupOption.BACKUP.getName() + "] | \n\t["
250       + StartupOption.CHECKPOINT.getName() + "] | \n\t["
251       + StartupOption.FORMAT.getName() + " ["
252       + StartupOption.CLUSTERID.getName() + " cid ] ["
253       + StartupOption.FORCE.getName() + "] ["
254       + StartupOption.NONINTERACTIVE.getName() + "] ] | \n\t["
255       + StartupOption.UPGRADE.getName() +
256         " [" + StartupOption.CLUSTERID.getName() + " cid]" +
257         " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
258       + StartupOption.UPGRADEONLY.getName() +
259         " [" + StartupOption.CLUSTERID.getName() + " cid]" +
260         " [" + StartupOption.RENAMERESERVED.getName() + "<k-v pairs>] ] | \n\t["
261       + StartupOption.ROLLBACK.getName() + "] | \n\t["
262       + StartupOption.ROLLINGUPGRADE.getName() + " "
263       + RollingUpgradeStartupOption.getAllOptionString() + " ] | \n\t["
264       + StartupOption.FINALIZE.getName() + "] | \n\t["
265       + StartupOption.IMPORT.getName() + "] | \n\t["
266       + StartupOption.INITIALIZESHAREDEDITS.getName() + "] | \n\t["
267       + StartupOption.BOOTSTRAPSTANDBY.getName() + "] | \n\t["
268       + StartupOption.RECOVER.getName() + " [ "
269       + StartupOption.FORCE.getName() + "] ] | \n\t["
270       + StartupOption.METADATAVERSION.getName() + " ] "
271       + " ]";
272 
273 
getProtocolVersion(String protocol, long clientVersion)274   public long getProtocolVersion(String protocol,
275                                  long clientVersion) throws IOException {
276     if (protocol.equals(ClientProtocol.class.getName())) {
277       return ClientProtocol.versionID;
278     } else if (protocol.equals(DatanodeProtocol.class.getName())){
279       return DatanodeProtocol.versionID;
280     } else if (protocol.equals(NamenodeProtocol.class.getName())){
281       return NamenodeProtocol.versionID;
282     } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
283       return RefreshAuthorizationPolicyProtocol.versionID;
284     } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
285       return RefreshUserMappingsProtocol.versionID;
286     } else if (protocol.equals(RefreshCallQueueProtocol.class.getName())) {
287       return RefreshCallQueueProtocol.versionID;
288     } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
289       return GetUserMappingsProtocol.versionID;
290     } else if (protocol.equals(TraceAdminProtocol.class.getName())){
291       return TraceAdminProtocol.versionID;
292     } else {
293       throw new IOException("Unknown protocol to name node: " + protocol);
294     }
295   }
296 
297   public static final int DEFAULT_PORT = 8020;
298   public static final Logger LOG =
299       LoggerFactory.getLogger(NameNode.class.getName());
300   public static final Logger stateChangeLog =
301       LoggerFactory.getLogger("org.apache.hadoop.hdfs.StateChange");
302   public static final Logger blockStateChangeLog =
303       LoggerFactory.getLogger("BlockStateChange");
304   public static final HAState ACTIVE_STATE = new ActiveState();
305   public static final HAState STANDBY_STATE = new StandbyState();
306 
307   protected FSNamesystem namesystem;
308   protected final Configuration conf;
309   protected final NamenodeRole role;
310   private volatile HAState state;
311   private final boolean haEnabled;
312   private final HAContext haContext;
313   protected final boolean allowStaleStandbyReads;
314   private AtomicBoolean started = new AtomicBoolean(false);
315 
316 
317   /** httpServer */
318   protected NameNodeHttpServer httpServer;
319   private Thread emptier;
320   /** only used for testing purposes  */
321   protected boolean stopRequested = false;
322   /** Registration information of this name-node  */
323   protected NamenodeRegistration nodeRegistration;
324   /** Activated plug-ins. */
325   private List<ServicePlugin> plugins;
326 
327   private NameNodeRpcServer rpcServer;
328 
329   private JvmPauseMonitor pauseMonitor;
330   private ObjectName nameNodeStatusBeanName;
331   SpanReceiverHost spanReceiverHost;
332   /**
333    * The namenode address that clients will use to access this namenode
334    * or the name service. For HA configurations using logical URI, it
335    * will be the logical address.
336    */
337   private String clientNamenodeAddress;
338 
339   /** Format a new filesystem.  Destroys any filesystem that may already
340    * exist at this location.  **/
format(Configuration conf)341   public static void format(Configuration conf) throws IOException {
342     format(conf, true, true);
343   }
344 
345   static NameNodeMetrics metrics;
346   private static final StartupProgress startupProgress = new StartupProgress();
347   /** Return the {@link FSNamesystem} object.
348    * @return {@link FSNamesystem} object.
349    */
getNamesystem()350   public FSNamesystem getNamesystem() {
351     return namesystem;
352   }
353 
getRpcServer()354   public NamenodeProtocols getRpcServer() {
355     return rpcServer;
356   }
357 
initMetrics(Configuration conf, NamenodeRole role)358   static void initMetrics(Configuration conf, NamenodeRole role) {
359     metrics = NameNodeMetrics.create(conf, role);
360   }
361 
getNameNodeMetrics()362   public static NameNodeMetrics getNameNodeMetrics() {
363     return metrics;
364   }
365 
366   /**
367    * Returns object used for reporting namenode startup progress.
368    *
369    * @return StartupProgress for reporting namenode startup progress
370    */
getStartupProgress()371   public static StartupProgress getStartupProgress() {
372     return startupProgress;
373   }
374 
375   /**
376    * Return the service name of the issued delegation token.
377    *
378    * @return The name service id in HA-mode, or the rpc address in non-HA mode
379    */
getTokenServiceName()380   public String getTokenServiceName() {
381     return getClientNamenodeAddress();
382   }
383 
384   /**
385    * Set the namenode address that will be used by clients to access this
386    * namenode or name service. This needs to be called before the config
387    * is overriden.
388    */
setClientNamenodeAddress(Configuration conf)389   public void setClientNamenodeAddress(Configuration conf) {
390     String nnAddr = conf.get(FS_DEFAULT_NAME_KEY);
391     if (nnAddr == null) {
392       // default fs is not set.
393       clientNamenodeAddress = null;
394       return;
395     }
396 
397     LOG.info("{} is {}", FS_DEFAULT_NAME_KEY, nnAddr);
398     URI nnUri = URI.create(nnAddr);
399 
400     String nnHost = nnUri.getHost();
401     if (nnHost == null) {
402       clientNamenodeAddress = null;
403       return;
404     }
405 
406     if (DFSUtil.getNameServiceIds(conf).contains(nnHost)) {
407       // host name is logical
408       clientNamenodeAddress = nnHost;
409     } else if (nnUri.getPort() > 0) {
410       // physical address with a valid port
411       clientNamenodeAddress = nnUri.getAuthority();
412     } else {
413       // the port is missing or 0. Figure out real bind address later.
414       clientNamenodeAddress = null;
415       return;
416     }
417     LOG.info("Clients are to use {} to access"
418         + " this namenode/service.", clientNamenodeAddress );
419   }
420 
421   /**
422    * Get the namenode address to be used by clients.
423    * @return nn address
424    */
getClientNamenodeAddress()425   public String getClientNamenodeAddress() {
426     return clientNamenodeAddress;
427   }
428 
getAddress(String address)429   public static InetSocketAddress getAddress(String address) {
430     return NetUtils.createSocketAddr(address, DEFAULT_PORT);
431   }
432 
433   /**
434    * Set the configuration property for the service rpc address
435    * to address
436    */
setServiceAddress(Configuration conf, String address)437   public static void setServiceAddress(Configuration conf,
438                                            String address) {
439     LOG.info("Setting ADDRESS {}", address);
440     conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, address);
441   }
442 
443   /**
444    * Fetches the address for services to use when connecting to namenode
445    * based on the value of fallback returns null if the special
446    * address is not specified or returns the default namenode address
447    * to be used by both clients and services.
448    * Services here are datanodes, backup node, any non client connection
449    */
getServiceAddress(Configuration conf, boolean fallback)450   public static InetSocketAddress getServiceAddress(Configuration conf,
451                                                         boolean fallback) {
452     String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
453     if (addr == null || addr.isEmpty()) {
454       return fallback ? getAddress(conf) : null;
455     }
456     return getAddress(addr);
457   }
458 
getAddress(Configuration conf)459   public static InetSocketAddress getAddress(Configuration conf) {
460     URI filesystemURI = FileSystem.getDefaultUri(conf);
461     return getAddress(filesystemURI);
462   }
463 
464 
465   /**
466    * @return address of file system
467    */
getAddress(URI filesystemURI)468   public static InetSocketAddress getAddress(URI filesystemURI) {
469     String authority = filesystemURI.getAuthority();
470     if (authority == null) {
471       throw new IllegalArgumentException(String.format(
472           "Invalid URI for NameNode address (check %s): %s has no authority.",
473           FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString()));
474     }
475     if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(
476         filesystemURI.getScheme())) {
477       throw new IllegalArgumentException(String.format(
478           "Invalid URI for NameNode address (check %s): %s is not of scheme '%s'.",
479           FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString(),
480           HdfsConstants.HDFS_URI_SCHEME));
481     }
482     return getAddress(authority);
483   }
484 
getUri(InetSocketAddress namenode)485   public static URI getUri(InetSocketAddress namenode) {
486     int port = namenode.getPort();
487     String portString = port == DEFAULT_PORT ? "" : (":"+port);
488     return URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
489         + namenode.getHostName()+portString);
490   }
491 
492   //
493   // Common NameNode methods implementation for the active name-node role.
494   //
getRole()495   public NamenodeRole getRole() {
496     return role;
497   }
498 
isRole(NamenodeRole that)499   boolean isRole(NamenodeRole that) {
500     return role.equals(that);
501   }
502 
503   /**
504    * Given a configuration get the address of the service rpc server
505    * If the service rpc is not configured returns null
506    */
getServiceRpcServerAddress(Configuration conf)507   protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) {
508     return NameNode.getServiceAddress(conf, false);
509   }
510 
getRpcServerAddress(Configuration conf)511   protected InetSocketAddress getRpcServerAddress(Configuration conf) {
512     return getAddress(conf);
513   }
514 
515   /** Given a configuration get the bind host of the service rpc server
516    *  If the bind host is not configured returns null.
517    */
getServiceRpcServerBindHost(Configuration conf)518   protected String getServiceRpcServerBindHost(Configuration conf) {
519     String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
520     if (addr == null || addr.isEmpty()) {
521       return null;
522     }
523     return addr;
524   }
525 
526   /** Given a configuration get the bind host of the client rpc server
527    *  If the bind host is not configured returns null.
528    */
getRpcServerBindHost(Configuration conf)529   protected String getRpcServerBindHost(Configuration conf) {
530     String addr = conf.getTrimmed(DFS_NAMENODE_RPC_BIND_HOST_KEY);
531     if (addr == null || addr.isEmpty()) {
532       return null;
533     }
534     return addr;
535   }
536 
537   /**
538    * Modifies the configuration passed to contain the service rpc address setting
539    */
setRpcServiceServerAddress(Configuration conf, InetSocketAddress serviceRPCAddress)540   protected void setRpcServiceServerAddress(Configuration conf,
541       InetSocketAddress serviceRPCAddress) {
542     setServiceAddress(conf, NetUtils.getHostPortString(serviceRPCAddress));
543   }
544 
setRpcServerAddress(Configuration conf, InetSocketAddress rpcAddress)545   protected void setRpcServerAddress(Configuration conf,
546       InetSocketAddress rpcAddress) {
547     FileSystem.setDefaultUri(conf, getUri(rpcAddress));
548   }
549 
getHttpServerAddress(Configuration conf)550   protected InetSocketAddress getHttpServerAddress(Configuration conf) {
551     return getHttpAddress(conf);
552   }
553 
554   /**
555    * HTTP server address for binding the endpoint. This method is
556    * for use by the NameNode and its derivatives. It may return
557    * a different address than the one that should be used by clients to
558    * connect to the NameNode. See
559    * {@link DFSConfigKeys#DFS_NAMENODE_HTTP_BIND_HOST_KEY}
560    *
561    * @param conf
562    * @return
563    */
getHttpServerBindAddress(Configuration conf)564   protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
565     InetSocketAddress bindAddress = getHttpServerAddress(conf);
566 
567     // If DFS_NAMENODE_HTTP_BIND_HOST_KEY exists then it overrides the
568     // host name portion of DFS_NAMENODE_HTTP_ADDRESS_KEY.
569     final String bindHost = conf.getTrimmed(DFS_NAMENODE_HTTP_BIND_HOST_KEY);
570     if (bindHost != null && !bindHost.isEmpty()) {
571       bindAddress = new InetSocketAddress(bindHost, bindAddress.getPort());
572     }
573 
574     return bindAddress;
575   }
576 
577   /** @return the NameNode HTTP address. */
getHttpAddress(Configuration conf)578   public static InetSocketAddress getHttpAddress(Configuration conf) {
579     return  NetUtils.createSocketAddr(
580         conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
581   }
582 
loadNamesystem(Configuration conf)583   protected void loadNamesystem(Configuration conf) throws IOException {
584     this.namesystem = FSNamesystem.loadFromDisk(conf);
585   }
586 
getRegistration()587   NamenodeRegistration getRegistration() {
588     return nodeRegistration;
589   }
590 
setRegistration()591   NamenodeRegistration setRegistration() {
592     nodeRegistration = new NamenodeRegistration(
593         NetUtils.getHostPortString(rpcServer.getRpcAddress()),
594         NetUtils.getHostPortString(getHttpAddress()),
595         getFSImage().getStorage(), getRole());
596     return nodeRegistration;
597   }
598 
599   /* optimize ugi lookup for RPC operations to avoid a trip through
600    * UGI.getCurrentUser which is synch'ed
601    */
getRemoteUser()602   public static UserGroupInformation getRemoteUser() throws IOException {
603     UserGroupInformation ugi = Server.getRemoteUser();
604     return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
605   }
606 
607 
608   /**
609    * Login as the configured user for the NameNode.
610    */
loginAsNameNodeUser(Configuration conf)611   void loginAsNameNodeUser(Configuration conf) throws IOException {
612     InetSocketAddress socAddr = getRpcServerAddress(conf);
613     SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
614         DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
615   }
616 
617   /**
618    * Initialize name-node.
619    *
620    * @param conf the configuration
621    */
initialize(Configuration conf)622   protected void initialize(Configuration conf) throws IOException {
623     if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
624       String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
625       if (intervals != null) {
626         conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
627           intervals);
628       }
629     }
630 
631     UserGroupInformation.setConfiguration(conf);
632     loginAsNameNodeUser(conf);
633 
634     NameNode.initMetrics(conf, this.getRole());
635     StartupProgressMetrics.register(startupProgress);
636 
637     if (NamenodeRole.NAMENODE == role) {
638       startHttpServer(conf);
639     }
640 
641     this.spanReceiverHost =
642       SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX);
643 
644     loadNamesystem(conf);
645 
646     rpcServer = createRpcServer(conf);
647     if (clientNamenodeAddress == null) {
648       // This is expected for MiniDFSCluster. Set it now using
649       // the RPC server's bind address.
650       clientNamenodeAddress =
651           NetUtils.getHostPortString(rpcServer.getRpcAddress());
652       LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
653           + " this namenode/service.");
654     }
655     if (NamenodeRole.NAMENODE == role) {
656       httpServer.setNameNodeAddress(getNameNodeAddress());
657       httpServer.setFSImage(getFSImage());
658     }
659 
660     pauseMonitor = new JvmPauseMonitor(conf);
661     pauseMonitor.start();
662     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
663 
664     startCommonServices(conf);
665   }
666 
667   /**
668    * Create the RPC server implementation. Used as an extension point for the
669    * BackupNode.
670    */
createRpcServer(Configuration conf)671   protected NameNodeRpcServer createRpcServer(Configuration conf)
672       throws IOException {
673     return new NameNodeRpcServer(conf, this);
674   }
675 
676   /** Start the services common to active and standby states */
startCommonServices(Configuration conf)677   private void startCommonServices(Configuration conf) throws IOException {
678     namesystem.startCommonServices(conf, haContext);
679     registerNNSMXBean();
680     if (NamenodeRole.NAMENODE != role) {
681       startHttpServer(conf);
682       httpServer.setNameNodeAddress(getNameNodeAddress());
683       httpServer.setFSImage(getFSImage());
684     }
685     rpcServer.start();
686     plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
687         ServicePlugin.class);
688     for (ServicePlugin p: plugins) {
689       try {
690         p.start(this);
691       } catch (Throwable t) {
692         LOG.warn("ServicePlugin " + p + " could not be started", t);
693       }
694     }
695     LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress());
696     if (rpcServer.getServiceRpcAddress() != null) {
697       LOG.info(getRole() + " service RPC up at: "
698           + rpcServer.getServiceRpcAddress());
699     }
700   }
701 
stopCommonServices()702   private void stopCommonServices() {
703     if(rpcServer != null) rpcServer.stop();
704     if(namesystem != null) namesystem.close();
705     if (pauseMonitor != null) pauseMonitor.stop();
706     if (plugins != null) {
707       for (ServicePlugin p : plugins) {
708         try {
709           p.stop();
710         } catch (Throwable t) {
711           LOG.warn("ServicePlugin " + p + " could not be stopped", t);
712         }
713       }
714     }
715     stopHttpServer();
716   }
717 
startTrashEmptier(final Configuration conf)718   private void startTrashEmptier(final Configuration conf) throws IOException {
719     long trashInterval =
720         conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
721     if (trashInterval == 0) {
722       return;
723     } else if (trashInterval < 0) {
724       throw new IOException("Cannot start trash emptier with negative interval."
725           + " Set " + FS_TRASH_INTERVAL_KEY + " to a positive value.");
726     }
727 
728     // This may be called from the transitionToActive code path, in which
729     // case the current user is the administrator, not the NN. The trash
730     // emptier needs to run as the NN. See HDFS-3972.
731     FileSystem fs = SecurityUtil.doAsLoginUser(
732         new PrivilegedExceptionAction<FileSystem>() {
733           @Override
734           public FileSystem run() throws IOException {
735             return FileSystem.get(conf);
736           }
737         });
738     this.emptier = new Thread(new Trash(fs, conf).getEmptier(), "Trash Emptier");
739     this.emptier.setDaemon(true);
740     this.emptier.start();
741   }
742 
stopTrashEmptier()743   private void stopTrashEmptier() {
744     if (this.emptier != null) {
745       emptier.interrupt();
746       emptier = null;
747     }
748   }
749 
startHttpServer(final Configuration conf)750   private void startHttpServer(final Configuration conf) throws IOException {
751     httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
752     httpServer.start();
753     httpServer.setStartupProgress(startupProgress);
754   }
755 
stopHttpServer()756   private void stopHttpServer() {
757     try {
758       if (httpServer != null) httpServer.stop();
759     } catch (Exception e) {
760       LOG.error("Exception while stopping httpserver", e);
761     }
762   }
763 
764   /**
765    * Start NameNode.
766    * <p>
767    * The name-node can be started with one of the following startup options:
768    * <ul>
769    * <li>{@link StartupOption#REGULAR REGULAR} - normal name node startup</li>
770    * <li>{@link StartupOption#FORMAT FORMAT} - format name node</li>
771    * <li>{@link StartupOption#BACKUP BACKUP} - start backup node</li>
772    * <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
773    * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster
774    * <li>{@link StartupOption#UPGRADEONLY UPGRADEONLY} - upgrade the cluster
775    * upgrade and create a snapshot of the current file system state</li>
776    * <li>{@link StartupOption#RECOVER RECOVERY} - recover name node
777    * metadata</li>
778    * <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the
779    *            cluster back to the previous state</li>
780    * <li>{@link StartupOption#FINALIZE FINALIZE} - finalize
781    *            previous upgrade</li>
782    * <li>{@link StartupOption#IMPORT IMPORT} - import checkpoint</li>
783    * </ul>
784    * The option is passed via configuration field:
785    * <tt>dfs.namenode.startup</tt>
786    *
787    * The conf will be modified to reflect the actual ports on which
788    * the NameNode is up and running if the user passes the port as
789    * <code>zero</code> in the conf.
790    *
791    * @param conf  confirguration
792    * @throws IOException
793    */
NameNode(Configuration conf)794   public NameNode(Configuration conf) throws IOException {
795     this(conf, NamenodeRole.NAMENODE);
796   }
797 
NameNode(Configuration conf, NamenodeRole role)798   protected NameNode(Configuration conf, NamenodeRole role)
799       throws IOException {
800     this.conf = conf;
801     this.role = role;
802     setClientNamenodeAddress(conf);
803     String nsId = getNameServiceId(conf);
804     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
805     this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
806     state = createHAState(getStartupOption(conf));
807     this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
808     this.haContext = createHAContext();
809     try {
810       initializeGenericKeys(conf, nsId, namenodeId);
811       initialize(conf);
812       try {
813         haContext.writeLock();
814         state.prepareToEnterState(haContext);
815         state.enterState(haContext);
816       } finally {
817         haContext.writeUnlock();
818       }
819     } catch (IOException e) {
820       this.stop();
821       throw e;
822     } catch (HadoopIllegalArgumentException e) {
823       this.stop();
824       throw e;
825     }
826     this.started.set(true);
827   }
828 
createHAState(StartupOption startOpt)829   protected HAState createHAState(StartupOption startOpt) {
830     if (!haEnabled || startOpt == StartupOption.UPGRADE
831         || startOpt == StartupOption.UPGRADEONLY) {
832       return ACTIVE_STATE;
833     } else {
834       return STANDBY_STATE;
835     }
836   }
837 
createHAContext()838   protected HAContext createHAContext() {
839     return new NameNodeHAContext();
840   }
841 
842   /**
843    * Wait for service to finish.
844    * (Normally, it runs forever.)
845    */
join()846   public void join() {
847     try {
848       rpcServer.join();
849     } catch (InterruptedException ie) {
850       LOG.info("Caught interrupted exception ", ie);
851     }
852   }
853 
854   /**
855    * Stop all NameNode threads and wait for all to finish.
856    */
stop()857   public void stop() {
858     synchronized(this) {
859       if (stopRequested)
860         return;
861       stopRequested = true;
862     }
863     try {
864       if (state != null) {
865         state.exitState(haContext);
866       }
867     } catch (ServiceFailedException e) {
868       LOG.warn("Encountered exception while exiting state ", e);
869     } finally {
870       stopCommonServices();
871       if (metrics != null) {
872         metrics.shutdown();
873       }
874       if (namesystem != null) {
875         namesystem.shutdown();
876       }
877       if (nameNodeStatusBeanName != null) {
878         MBeans.unregister(nameNodeStatusBeanName);
879         nameNodeStatusBeanName = null;
880       }
881       if (this.spanReceiverHost != null) {
882         this.spanReceiverHost.closeReceivers();
883       }
884     }
885   }
886 
isStopRequested()887   synchronized boolean isStopRequested() {
888     return stopRequested;
889   }
890 
891   /**
892    * Is the cluster currently in safe mode?
893    */
isInSafeMode()894   public boolean isInSafeMode() {
895     return namesystem.isInSafeMode();
896   }
897 
898   /** get FSImage */
899   @VisibleForTesting
getFSImage()900   public FSImage getFSImage() {
901     return namesystem.getFSImage();
902   }
903 
904   /**
905    * @return NameNode RPC address
906    */
getNameNodeAddress()907   public InetSocketAddress getNameNodeAddress() {
908     return rpcServer.getRpcAddress();
909   }
910 
911   /**
912    * @return NameNode RPC address in "host:port" string form
913    */
getNameNodeAddressHostPortString()914   public String getNameNodeAddressHostPortString() {
915     return NetUtils.getHostPortString(rpcServer.getRpcAddress());
916   }
917 
918   /**
919    * @return NameNode service RPC address if configured, the
920    *    NameNode RPC address otherwise
921    */
getServiceRpcAddress()922   public InetSocketAddress getServiceRpcAddress() {
923     final InetSocketAddress serviceAddr = rpcServer.getServiceRpcAddress();
924     return serviceAddr == null ? rpcServer.getRpcAddress() : serviceAddr;
925   }
926 
927   /**
928    * @return NameNode HTTP address, used by the Web UI, image transfer,
929    *    and HTTP-based file system clients like Hftp and WebHDFS
930    */
getHttpAddress()931   public InetSocketAddress getHttpAddress() {
932     return httpServer.getHttpAddress();
933   }
934 
935   /**
936    * @return NameNode HTTPS address, used by the Web UI, image transfer,
937    *    and HTTP-based file system clients like Hftp and WebHDFS
938    */
getHttpsAddress()939   public InetSocketAddress getHttpsAddress() {
940     return httpServer.getHttpsAddress();
941   }
942 
943   /**
944    * Verify that configured directories exist, then
945    * Interactively confirm that formatting is desired
946    * for each existing directory and format them.
947    *
948    * @param conf configuration to use
949    * @param force if true, format regardless of whether dirs exist
950    * @return true if formatting was aborted, false otherwise
951    * @throws IOException
952    */
format(Configuration conf, boolean force, boolean isInteractive)953   private static boolean format(Configuration conf, boolean force,
954       boolean isInteractive) throws IOException {
955     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
956     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
957     initializeGenericKeys(conf, nsId, namenodeId);
958     checkAllowFormat(conf);
959 
960     if (UserGroupInformation.isSecurityEnabled()) {
961       InetSocketAddress socAddr = getAddress(conf);
962       SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
963           DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
964     }
965 
966     Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
967     List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
968     List<URI> dirsToPrompt = new ArrayList<URI>();
969     dirsToPrompt.addAll(nameDirsToFormat);
970     dirsToPrompt.addAll(sharedDirs);
971     List<URI> editDirsToFormat =
972                  FSNamesystem.getNamespaceEditsDirs(conf);
973 
974     // if clusterID is not provided - see if you can find the current one
975     String clusterId = StartupOption.FORMAT.getClusterId();
976     if(clusterId == null || clusterId.equals("")) {
977       //Generate a new cluster id
978       clusterId = NNStorage.newClusterID();
979     }
980     System.out.println("Formatting using clusterid: " + clusterId);
981 
982     FSImage fsImage = new FSImage(conf, nameDirsToFormat, editDirsToFormat);
983     try {
984       FSNamesystem fsn = new FSNamesystem(conf, fsImage);
985       fsImage.getEditLog().initJournalsForWrite();
986 
987       if (!fsImage.confirmFormat(force, isInteractive)) {
988         return true; // aborted
989       }
990 
991       fsImage.format(fsn, clusterId);
992     } catch (IOException ioe) {
993       LOG.warn("Encountered exception during format: ", ioe);
994       fsImage.close();
995       throw ioe;
996     }
997     return false;
998   }
999 
checkAllowFormat(Configuration conf)1000   public static void checkAllowFormat(Configuration conf) throws IOException {
1001     if (!conf.getBoolean(DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY,
1002         DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT)) {
1003       throw new IOException("The option " + DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY
1004                 + " is set to false for this filesystem, so it "
1005                 + "cannot be formatted. You will need to set "
1006                 + DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY +" parameter "
1007                 + "to true in order to format this filesystem");
1008     }
1009   }
1010 
1011   @VisibleForTesting
initializeSharedEdits(Configuration conf)1012   public static boolean initializeSharedEdits(Configuration conf) throws IOException {
1013     return initializeSharedEdits(conf, true);
1014   }
1015 
1016   @VisibleForTesting
initializeSharedEdits(Configuration conf, boolean force)1017   public static boolean initializeSharedEdits(Configuration conf,
1018       boolean force) throws IOException {
1019     return initializeSharedEdits(conf, force, false);
1020   }
1021 
1022   /**
1023    * Clone the supplied configuration but remove the shared edits dirs.
1024    *
1025    * @param conf Supplies the original configuration.
1026    * @return Cloned configuration without the shared edit dirs.
1027    * @throws IOException on failure to generate the configuration.
1028    */
getConfigurationWithoutSharedEdits( Configuration conf)1029   private static Configuration getConfigurationWithoutSharedEdits(
1030       Configuration conf)
1031       throws IOException {
1032     List<URI> editsDirs = FSNamesystem.getNamespaceEditsDirs(conf, false);
1033     String editsDirsString = Joiner.on(",").join(editsDirs);
1034 
1035     Configuration confWithoutShared = new Configuration(conf);
1036     confWithoutShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
1037     confWithoutShared.setStrings(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
1038         editsDirsString);
1039     return confWithoutShared;
1040   }
1041 
1042   /**
1043    * Format a new shared edits dir and copy in enough edit log segments so that
1044    * the standby NN can start up.
1045    *
1046    * @param conf configuration
1047    * @param force format regardless of whether or not the shared edits dir exists
1048    * @param interactive prompt the user when a dir exists
1049    * @return true if the command aborts, false otherwise
1050    */
initializeSharedEdits(Configuration conf, boolean force, boolean interactive)1051   private static boolean initializeSharedEdits(Configuration conf,
1052       boolean force, boolean interactive) throws IOException {
1053     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
1054     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
1055     initializeGenericKeys(conf, nsId, namenodeId);
1056 
1057     if (conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY) == null) {
1058       LOG.error("No shared edits directory configured for namespace " +
1059           nsId + " namenode " + namenodeId);
1060       return false;
1061     }
1062 
1063     if (UserGroupInformation.isSecurityEnabled()) {
1064       InetSocketAddress socAddr = getAddress(conf);
1065       SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
1066           DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
1067     }
1068 
1069     NNStorage existingStorage = null;
1070     FSImage sharedEditsImage = null;
1071     try {
1072       FSNamesystem fsns =
1073           FSNamesystem.loadFromDisk(getConfigurationWithoutSharedEdits(conf));
1074 
1075       existingStorage = fsns.getFSImage().getStorage();
1076       NamespaceInfo nsInfo = existingStorage.getNamespaceInfo();
1077 
1078       List<URI> sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
1079 
1080       sharedEditsImage = new FSImage(conf,
1081           Lists.<URI>newArrayList(),
1082           sharedEditsDirs);
1083       sharedEditsImage.getEditLog().initJournalsForWrite();
1084 
1085       if (!sharedEditsImage.confirmFormat(force, interactive)) {
1086         return true; // abort
1087       }
1088 
1089       NNStorage newSharedStorage = sharedEditsImage.getStorage();
1090       // Call Storage.format instead of FSImage.format here, since we don't
1091       // actually want to save a checkpoint - just prime the dirs with
1092       // the existing namespace info
1093       newSharedStorage.format(nsInfo);
1094       sharedEditsImage.getEditLog().formatNonFileJournals(nsInfo);
1095 
1096       // Need to make sure the edit log segments are in good shape to initialize
1097       // the shared edits dir.
1098       fsns.getFSImage().getEditLog().close();
1099       fsns.getFSImage().getEditLog().initJournalsForWrite();
1100       fsns.getFSImage().getEditLog().recoverUnclosedStreams();
1101 
1102       copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, newSharedStorage,
1103           conf);
1104     } catch (IOException ioe) {
1105       LOG.error("Could not initialize shared edits dir", ioe);
1106       return true; // aborted
1107     } finally {
1108       if (sharedEditsImage != null) {
1109         try {
1110           sharedEditsImage.close();
1111         }  catch (IOException ioe) {
1112           LOG.warn("Could not close sharedEditsImage", ioe);
1113         }
1114       }
1115       // Have to unlock storage explicitly for the case when we're running in a
1116       // unit test, which runs in the same JVM as NNs.
1117       if (existingStorage != null) {
1118         try {
1119           existingStorage.unlockAll();
1120         } catch (IOException ioe) {
1121           LOG.warn("Could not unlock storage directories", ioe);
1122           return true; // aborted
1123         }
1124       }
1125     }
1126     return false; // did not abort
1127   }
1128 
copyEditLogSegmentsToSharedDir(FSNamesystem fsns, Collection<URI> sharedEditsDirs, NNStorage newSharedStorage, Configuration conf)1129   private static void copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
1130       Collection<URI> sharedEditsDirs, NNStorage newSharedStorage,
1131       Configuration conf) throws IOException {
1132     Preconditions.checkArgument(!sharedEditsDirs.isEmpty(),
1133         "No shared edits specified");
1134     // Copy edit log segments into the new shared edits dir.
1135     List<URI> sharedEditsUris = new ArrayList<URI>(sharedEditsDirs);
1136     FSEditLog newSharedEditLog = new FSEditLog(conf, newSharedStorage,
1137         sharedEditsUris);
1138     newSharedEditLog.initJournalsForWrite();
1139     newSharedEditLog.recoverUnclosedStreams();
1140 
1141     FSEditLog sourceEditLog = fsns.getFSImage().editLog;
1142 
1143     long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId();
1144 
1145     Collection<EditLogInputStream> streams = null;
1146     try {
1147       streams = sourceEditLog.selectInputStreams(fromTxId + 1, 0);
1148 
1149       // Set the nextTxid to the CheckpointTxId+1
1150       newSharedEditLog.setNextTxId(fromTxId + 1);
1151 
1152       // Copy all edits after last CheckpointTxId to shared edits dir
1153       for (EditLogInputStream stream : streams) {
1154         LOG.debug("Beginning to copy stream " + stream + " to shared edits");
1155         FSEditLogOp op;
1156         boolean segmentOpen = false;
1157         while ((op = stream.readOp()) != null) {
1158           if (LOG.isTraceEnabled()) {
1159             LOG.trace("copying op: " + op);
1160           }
1161           if (!segmentOpen) {
1162             newSharedEditLog.startLogSegment(op.txid, false);
1163             segmentOpen = true;
1164           }
1165 
1166           newSharedEditLog.logEdit(op);
1167 
1168           if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
1169             newSharedEditLog.logSync();
1170             newSharedEditLog.endCurrentLogSegment(false);
1171             LOG.debug("ending log segment because of END_LOG_SEGMENT op in "
1172                 + stream);
1173             segmentOpen = false;
1174           }
1175         }
1176 
1177         if (segmentOpen) {
1178           LOG.debug("ending log segment because of end of stream in " + stream);
1179           newSharedEditLog.logSync();
1180           newSharedEditLog.endCurrentLogSegment(false);
1181           segmentOpen = false;
1182         }
1183       }
1184     } finally {
1185       if (streams != null) {
1186         FSEditLog.closeAllStreams(streams);
1187       }
1188     }
1189   }
1190 
1191   @VisibleForTesting
doRollback(Configuration conf, boolean isConfirmationNeeded)1192   public static boolean doRollback(Configuration conf,
1193       boolean isConfirmationNeeded) throws IOException {
1194     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
1195     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
1196     initializeGenericKeys(conf, nsId, namenodeId);
1197 
1198     FSNamesystem nsys = new FSNamesystem(conf, new FSImage(conf));
1199     System.err.print(
1200         "\"rollBack\" will remove the current state of the file system,\n"
1201         + "returning you to the state prior to initiating your recent.\n"
1202         + "upgrade. This action is permanent and cannot be undone. If you\n"
1203         + "are performing a rollback in an HA environment, you should be\n"
1204         + "certain that no NameNode process is running on any host.");
1205     if (isConfirmationNeeded) {
1206       if (!confirmPrompt("Roll back file system state?")) {
1207         System.err.println("Rollback aborted.");
1208         return true;
1209       }
1210     }
1211     nsys.getFSImage().doRollback(nsys);
1212     return false;
1213   }
1214 
printUsage(PrintStream out)1215   private static void printUsage(PrintStream out) {
1216     out.println(USAGE + "\n");
1217   }
1218 
1219   @VisibleForTesting
parseArguments(String args[])1220   static StartupOption parseArguments(String args[]) {
1221     int argsLen = (args == null) ? 0 : args.length;
1222     StartupOption startOpt = StartupOption.REGULAR;
1223     for(int i=0; i < argsLen; i++) {
1224       String cmd = args[i];
1225       if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
1226         startOpt = StartupOption.FORMAT;
1227         for (i = i + 1; i < argsLen; i++) {
1228           if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
1229             i++;
1230             if (i >= argsLen) {
1231               // if no cluster id specified, return null
1232               LOG.error("Must specify a valid cluster ID after the "
1233                   + StartupOption.CLUSTERID.getName() + " flag");
1234               return null;
1235             }
1236             String clusterId = args[i];
1237             // Make sure an id is specified and not another flag
1238             if (clusterId.isEmpty() ||
1239                 clusterId.equalsIgnoreCase(StartupOption.FORCE.getName()) ||
1240                 clusterId.equalsIgnoreCase(
1241                     StartupOption.NONINTERACTIVE.getName())) {
1242               LOG.error("Must specify a valid cluster ID after the "
1243                   + StartupOption.CLUSTERID.getName() + " flag");
1244               return null;
1245             }
1246             startOpt.setClusterId(clusterId);
1247           }
1248 
1249           if (args[i].equalsIgnoreCase(StartupOption.FORCE.getName())) {
1250             startOpt.setForceFormat(true);
1251           }
1252 
1253           if (args[i].equalsIgnoreCase(StartupOption.NONINTERACTIVE.getName())) {
1254             startOpt.setInteractiveFormat(false);
1255           }
1256         }
1257       } else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
1258         startOpt = StartupOption.GENCLUSTERID;
1259       } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
1260         startOpt = StartupOption.REGULAR;
1261       } else if (StartupOption.BACKUP.getName().equalsIgnoreCase(cmd)) {
1262         startOpt = StartupOption.BACKUP;
1263       } else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
1264         startOpt = StartupOption.CHECKPOINT;
1265       } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)
1266           || StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) {
1267         startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ?
1268             StartupOption.UPGRADE : StartupOption.UPGRADEONLY;
1269         /* Can be followed by CLUSTERID with a required parameter or
1270          * RENAMERESERVED with an optional parameter
1271          */
1272         while (i + 1 < argsLen) {
1273           String flag = args[i + 1];
1274           if (flag.equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
1275             if (i + 2 < argsLen) {
1276               i += 2;
1277               startOpt.setClusterId(args[i]);
1278             } else {
1279               LOG.error("Must specify a valid cluster ID after the "
1280                   + StartupOption.CLUSTERID.getName() + " flag");
1281               return null;
1282             }
1283           } else if (flag.equalsIgnoreCase(StartupOption.RENAMERESERVED
1284               .getName())) {
1285             if (i + 2 < argsLen) {
1286               FSImageFormat.setRenameReservedPairs(args[i + 2]);
1287               i += 2;
1288             } else {
1289               FSImageFormat.useDefaultRenameReservedPairs();
1290               i += 1;
1291             }
1292           } else {
1293             LOG.error("Unknown upgrade flag " + flag);
1294             return null;
1295           }
1296         }
1297       } else if (StartupOption.ROLLINGUPGRADE.getName().equalsIgnoreCase(cmd)) {
1298         startOpt = StartupOption.ROLLINGUPGRADE;
1299         ++i;
1300         if (i >= argsLen) {
1301           LOG.error("Must specify a rolling upgrade startup option "
1302               + RollingUpgradeStartupOption.getAllOptionString());
1303           return null;
1304         }
1305         startOpt.setRollingUpgradeStartupOption(args[i]);
1306       } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
1307         startOpt = StartupOption.ROLLBACK;
1308       } else if (StartupOption.FINALIZE.getName().equalsIgnoreCase(cmd)) {
1309         startOpt = StartupOption.FINALIZE;
1310       } else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
1311         startOpt = StartupOption.IMPORT;
1312       } else if (StartupOption.BOOTSTRAPSTANDBY.getName().equalsIgnoreCase(cmd)) {
1313         startOpt = StartupOption.BOOTSTRAPSTANDBY;
1314         return startOpt;
1315       } else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) {
1316         startOpt = StartupOption.INITIALIZESHAREDEDITS;
1317         for (i = i + 1 ; i < argsLen; i++) {
1318           if (StartupOption.NONINTERACTIVE.getName().equals(args[i])) {
1319             startOpt.setInteractiveFormat(false);
1320           } else if (StartupOption.FORCE.getName().equals(args[i])) {
1321             startOpt.setForceFormat(true);
1322           } else {
1323             LOG.error("Invalid argument: " + args[i]);
1324             return null;
1325           }
1326         }
1327         return startOpt;
1328       } else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
1329         if (startOpt != StartupOption.REGULAR) {
1330           throw new RuntimeException("Can't combine -recover with " +
1331               "other startup options.");
1332         }
1333         startOpt = StartupOption.RECOVER;
1334         while (++i < argsLen) {
1335           if (args[i].equalsIgnoreCase(
1336                 StartupOption.FORCE.getName())) {
1337             startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
1338           } else {
1339             throw new RuntimeException("Error parsing recovery options: " +
1340               "can't understand option \"" + args[i] + "\"");
1341           }
1342         }
1343       } else if (StartupOption.METADATAVERSION.getName().equalsIgnoreCase(cmd)) {
1344         startOpt = StartupOption.METADATAVERSION;
1345       } else {
1346         return null;
1347       }
1348     }
1349     return startOpt;
1350   }
1351 
setStartupOption(Configuration conf, StartupOption opt)1352   private static void setStartupOption(Configuration conf, StartupOption opt) {
1353     conf.set(DFS_NAMENODE_STARTUP_KEY, opt.name());
1354   }
1355 
getStartupOption(Configuration conf)1356   static StartupOption getStartupOption(Configuration conf) {
1357     return StartupOption.valueOf(conf.get(DFS_NAMENODE_STARTUP_KEY,
1358                                           StartupOption.REGULAR.toString()));
1359   }
1360 
doRecovery(StartupOption startOpt, Configuration conf)1361   private static void doRecovery(StartupOption startOpt, Configuration conf)
1362       throws IOException {
1363     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
1364     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
1365     initializeGenericKeys(conf, nsId, namenodeId);
1366     if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) {
1367       if (!confirmPrompt("You have selected Metadata Recovery mode.  " +
1368           "This mode is intended to recover lost metadata on a corrupt " +
1369           "filesystem.  Metadata recovery mode often permanently deletes " +
1370           "data from your HDFS filesystem.  Please back up your edit log " +
1371           "and fsimage before trying this!\n\n" +
1372           "Are you ready to proceed? (Y/N)\n")) {
1373         System.err.println("Recovery aborted at user request.\n");
1374         return;
1375       }
1376     }
1377     MetaRecoveryContext.LOG.info("starting recovery...");
1378     UserGroupInformation.setConfiguration(conf);
1379     NameNode.initMetrics(conf, startOpt.toNodeRole());
1380     FSNamesystem fsn = null;
1381     try {
1382       fsn = FSNamesystem.loadFromDisk(conf);
1383       fsn.getFSImage().saveNamespace(fsn);
1384       MetaRecoveryContext.LOG.info("RECOVERY COMPLETE");
1385     } catch (IOException e) {
1386       MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
1387       throw e;
1388     } catch (RuntimeException e) {
1389       MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
1390       throw e;
1391     } finally {
1392       if (fsn != null)
1393         fsn.close();
1394     }
1395   }
1396 
1397   /**
1398    * Verify that configured directories exist, then print the metadata versions
1399    * of the software and the image.
1400    *
1401    * @param conf configuration to use
1402    * @throws IOException
1403    */
printMetadataVersion(Configuration conf)1404   private static boolean printMetadataVersion(Configuration conf)
1405     throws IOException {
1406     final String nsId = DFSUtil.getNamenodeNameServiceId(conf);
1407     final String namenodeId = HAUtil.getNameNodeId(conf, nsId);
1408     NameNode.initializeGenericKeys(conf, nsId, namenodeId);
1409     final FSImage fsImage = new FSImage(conf);
1410     final FSNamesystem fs = new FSNamesystem(conf, fsImage, false);
1411     return fsImage.recoverTransitionRead(
1412       StartupOption.METADATAVERSION, fs, null);
1413   }
1414 
createNameNode(String argv[], Configuration conf)1415   public static NameNode createNameNode(String argv[], Configuration conf)
1416       throws IOException {
1417     LOG.info("createNameNode " + Arrays.asList(argv));
1418     if (conf == null)
1419       conf = new HdfsConfiguration();
1420     StartupOption startOpt = parseArguments(argv);
1421     if (startOpt == null) {
1422       printUsage(System.err);
1423       return null;
1424     }
1425     setStartupOption(conf, startOpt);
1426 
1427     switch (startOpt) {
1428       case FORMAT: {
1429         boolean aborted = format(conf, startOpt.getForceFormat(),
1430             startOpt.getInteractiveFormat());
1431         terminate(aborted ? 1 : 0);
1432         return null; // avoid javac warning
1433       }
1434       case GENCLUSTERID: {
1435         System.err.println("Generating new cluster id:");
1436         System.out.println(NNStorage.newClusterID());
1437         terminate(0);
1438         return null;
1439       }
1440       case FINALIZE: {
1441         System.err.println("Use of the argument '" + StartupOption.FINALIZE +
1442             "' is no longer supported. To finalize an upgrade, start the NN " +
1443             " and then run `hdfs dfsadmin -finalizeUpgrade'");
1444         terminate(1);
1445         return null; // avoid javac warning
1446       }
1447       case ROLLBACK: {
1448         boolean aborted = doRollback(conf, true);
1449         terminate(aborted ? 1 : 0);
1450         return null; // avoid warning
1451       }
1452       case BOOTSTRAPSTANDBY: {
1453         String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
1454         int rc = BootstrapStandby.run(toolArgs, conf);
1455         terminate(rc);
1456         return null; // avoid warning
1457       }
1458       case INITIALIZESHAREDEDITS: {
1459         boolean aborted = initializeSharedEdits(conf,
1460             startOpt.getForceFormat(),
1461             startOpt.getInteractiveFormat());
1462         terminate(aborted ? 1 : 0);
1463         return null; // avoid warning
1464       }
1465       case BACKUP:
1466       case CHECKPOINT: {
1467         NamenodeRole role = startOpt.toNodeRole();
1468         DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
1469         return new BackupNode(conf, role);
1470       }
1471       case RECOVER: {
1472         NameNode.doRecovery(startOpt, conf);
1473         return null;
1474       }
1475       case METADATAVERSION: {
1476         printMetadataVersion(conf);
1477         terminate(0);
1478         return null; // avoid javac warning
1479       }
1480       case UPGRADEONLY: {
1481         DefaultMetricsSystem.initialize("NameNode");
1482         new NameNode(conf);
1483         terminate(0);
1484         return null;
1485       }
1486       default: {
1487         DefaultMetricsSystem.initialize("NameNode");
1488         return new NameNode(conf);
1489       }
1490     }
1491   }
1492 
1493   /**
1494    * In federation configuration is set for a set of
1495    * namenode and secondary namenode/backup/checkpointer, which are
1496    * grouped under a logical nameservice ID. The configuration keys specific
1497    * to them have suffix set to configured nameserviceId.
1498    *
1499    * This method copies the value from specific key of format key.nameserviceId
1500    * to key, to set up the generic configuration. Once this is done, only
1501    * generic version of the configuration is read in rest of the code, for
1502    * backward compatibility and simpler code changes.
1503    *
1504    * @param conf
1505    *          Configuration object to lookup specific key and to set the value
1506    *          to the key passed. Note the conf object is modified
1507    * @param nameserviceId name service Id (to distinguish federated NNs)
1508    * @param namenodeId the namenode ID (to distinguish HA NNs)
1509    * @see DFSUtil#setGenericConf(Configuration, String, String, String...)
1510    */
initializeGenericKeys(Configuration conf, String nameserviceId, String namenodeId)1511   public static void initializeGenericKeys(Configuration conf,
1512       String nameserviceId, String namenodeId) {
1513     if ((nameserviceId != null && !nameserviceId.isEmpty()) ||
1514         (namenodeId != null && !namenodeId.isEmpty())) {
1515       if (nameserviceId != null) {
1516         conf.set(DFS_NAMESERVICE_ID, nameserviceId);
1517       }
1518       if (namenodeId != null) {
1519         conf.set(DFS_HA_NAMENODE_ID_KEY, namenodeId);
1520       }
1521 
1522       DFSUtil.setGenericConf(conf, nameserviceId, namenodeId,
1523           NAMENODE_SPECIFIC_KEYS);
1524       DFSUtil.setGenericConf(conf, nameserviceId, null,
1525           NAMESERVICE_SPECIFIC_KEYS);
1526     }
1527 
1528     // If the RPC address is set use it to (re-)configure the default FS
1529     if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
1530       URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
1531           + conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
1532       conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString());
1533       LOG.debug("Setting " + FS_DEFAULT_NAME_KEY + " to " + defaultUri.toString());
1534     }
1535   }
1536 
1537   /**
1538    * Get the name service Id for the node
1539    * @return name service Id or null if federation is not configured
1540    */
getNameServiceId(Configuration conf)1541   protected String getNameServiceId(Configuration conf) {
1542     return DFSUtil.getNamenodeNameServiceId(conf);
1543   }
1544 
1545   /**
1546    */
main(String argv[])1547   public static void main(String argv[]) throws Exception {
1548     if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
1549       System.exit(0);
1550     }
1551 
1552     try {
1553       StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
1554       NameNode namenode = createNameNode(argv, null);
1555       if (namenode != null) {
1556         namenode.join();
1557       }
1558     } catch (Throwable e) {
1559       LOG.error("Failed to start namenode.", e);
1560       terminate(1, e);
1561     }
1562   }
1563 
monitorHealth()1564   synchronized void monitorHealth()
1565       throws HealthCheckFailedException, AccessControlException {
1566     namesystem.checkSuperuserPrivilege();
1567     if (!haEnabled) {
1568       return; // no-op, if HA is not enabled
1569     }
1570     getNamesystem().checkAvailableResources();
1571     if (!getNamesystem().nameNodeHasResourcesAvailable()) {
1572       throw new HealthCheckFailedException(
1573           "The NameNode has no resources available");
1574     }
1575   }
1576 
transitionToActive()1577   synchronized void transitionToActive()
1578       throws ServiceFailedException, AccessControlException {
1579     namesystem.checkSuperuserPrivilege();
1580     if (!haEnabled) {
1581       throw new ServiceFailedException("HA for namenode is not enabled");
1582     }
1583     state.setState(haContext, ACTIVE_STATE);
1584   }
1585 
transitionToStandby()1586   synchronized void transitionToStandby()
1587       throws ServiceFailedException, AccessControlException {
1588     namesystem.checkSuperuserPrivilege();
1589     if (!haEnabled) {
1590       throw new ServiceFailedException("HA for namenode is not enabled");
1591     }
1592     state.setState(haContext, STANDBY_STATE);
1593   }
1594 
getServiceStatus()1595   synchronized HAServiceStatus getServiceStatus()
1596       throws ServiceFailedException, AccessControlException {
1597     namesystem.checkSuperuserPrivilege();
1598     if (!haEnabled) {
1599       throw new ServiceFailedException("HA for namenode is not enabled");
1600     }
1601     if (state == null) {
1602       return new HAServiceStatus(HAServiceState.INITIALIZING);
1603     }
1604     HAServiceState retState = state.getServiceState();
1605     HAServiceStatus ret = new HAServiceStatus(retState);
1606     if (retState == HAServiceState.STANDBY) {
1607       String safemodeTip = namesystem.getSafeModeTip();
1608       if (!safemodeTip.isEmpty()) {
1609         ret.setNotReadyToBecomeActive(
1610             "The NameNode is in safemode. " +
1611             safemodeTip);
1612       } else {
1613         ret.setReadyToBecomeActive();
1614       }
1615     } else if (retState == HAServiceState.ACTIVE) {
1616       ret.setReadyToBecomeActive();
1617     } else {
1618       ret.setNotReadyToBecomeActive("State is " + state);
1619     }
1620     return ret;
1621   }
1622 
getServiceState()1623   synchronized HAServiceState getServiceState() {
1624     if (state == null) {
1625       return HAServiceState.INITIALIZING;
1626     }
1627     return state.getServiceState();
1628   }
1629 
1630   /**
1631    * Register NameNodeStatusMXBean
1632    */
registerNNSMXBean()1633   private void registerNNSMXBean() {
1634     nameNodeStatusBeanName = MBeans.register("NameNode", "NameNodeStatus", this);
1635   }
1636 
1637   @Override // NameNodeStatusMXBean
getNNRole()1638   public String getNNRole() {
1639     String roleStr = "";
1640     NamenodeRole role = getRole();
1641     if (null != role) {
1642       roleStr = role.toString();
1643     }
1644     return roleStr;
1645   }
1646 
1647   @Override // NameNodeStatusMXBean
getState()1648   public String getState() {
1649     String servStateStr = "";
1650     HAServiceState servState = getServiceState();
1651     if (null != servState) {
1652       servStateStr = servState.toString();
1653     }
1654     return servStateStr;
1655   }
1656 
1657   @Override // NameNodeStatusMXBean
getHostAndPort()1658   public String getHostAndPort() {
1659     return getNameNodeAddressHostPortString();
1660   }
1661 
1662   @Override // NameNodeStatusMXBean
isSecurityEnabled()1663   public boolean isSecurityEnabled() {
1664     return UserGroupInformation.isSecurityEnabled();
1665   }
1666 
1667   @Override // NameNodeStatusMXBean
getLastHATransitionTime()1668   public long getLastHATransitionTime() {
1669     return state.getLastHATransitionTime();
1670   }
1671 
1672   /**
1673    * Shutdown the NN immediately in an ungraceful way. Used when it would be
1674    * unsafe for the NN to continue operating, e.g. during a failed HA state
1675    * transition.
1676    *
1677    * @param t exception which warrants the shutdown. Printed to the NN log
1678    *          before exit.
1679    * @throws ExitException thrown only for testing.
1680    */
doImmediateShutdown(Throwable t)1681   protected synchronized void doImmediateShutdown(Throwable t)
1682       throws ExitException {
1683     String message = "Error encountered requiring NN shutdown. " +
1684         "Shutting down immediately.";
1685     try {
1686       LOG.error(message, t);
1687     } catch (Throwable ignored) {
1688       // This is unlikely to happen, but there's nothing we can do if it does.
1689     }
1690     terminate(1, t);
1691   }
1692 
1693   /**
1694    * Class used to expose {@link NameNode} as context to {@link HAState}
1695    */
1696   protected class NameNodeHAContext implements HAContext {
1697     @Override
setState(HAState s)1698     public void setState(HAState s) {
1699       state = s;
1700     }
1701 
1702     @Override
getState()1703     public HAState getState() {
1704       return state;
1705     }
1706 
1707     @Override
startActiveServices()1708     public void startActiveServices() throws IOException {
1709       try {
1710         namesystem.startActiveServices();
1711         startTrashEmptier(conf);
1712       } catch (Throwable t) {
1713         doImmediateShutdown(t);
1714       }
1715     }
1716 
1717     @Override
stopActiveServices()1718     public void stopActiveServices() throws IOException {
1719       try {
1720         if (namesystem != null) {
1721           namesystem.stopActiveServices();
1722         }
1723         stopTrashEmptier();
1724       } catch (Throwable t) {
1725         doImmediateShutdown(t);
1726       }
1727     }
1728 
1729     @Override
startStandbyServices()1730     public void startStandbyServices() throws IOException {
1731       try {
1732         namesystem.startStandbyServices(conf);
1733       } catch (Throwable t) {
1734         doImmediateShutdown(t);
1735       }
1736     }
1737 
1738     @Override
prepareToStopStandbyServices()1739     public void prepareToStopStandbyServices() throws ServiceFailedException {
1740       try {
1741         namesystem.prepareToStopStandbyServices();
1742       } catch (Throwable t) {
1743         doImmediateShutdown(t);
1744       }
1745     }
1746 
1747     @Override
stopStandbyServices()1748     public void stopStandbyServices() throws IOException {
1749       try {
1750         if (namesystem != null) {
1751           namesystem.stopStandbyServices();
1752         }
1753       } catch (Throwable t) {
1754         doImmediateShutdown(t);
1755       }
1756     }
1757 
1758     @Override
writeLock()1759     public void writeLock() {
1760       namesystem.writeLock();
1761       namesystem.lockRetryCache();
1762     }
1763 
1764     @Override
writeUnlock()1765     public void writeUnlock() {
1766       namesystem.unlockRetryCache();
1767       namesystem.writeUnlock();
1768     }
1769 
1770     /** Check if an operation of given category is allowed */
1771     @Override
checkOperation(final OperationCategory op)1772     public void checkOperation(final OperationCategory op)
1773         throws StandbyException {
1774       state.checkOperation(haContext, op);
1775     }
1776 
1777     @Override
allowStaleReads()1778     public boolean allowStaleReads() {
1779       return allowStaleStandbyReads;
1780     }
1781 
1782   }
1783 
isStandbyState()1784   public boolean isStandbyState() {
1785     return (state.equals(STANDBY_STATE));
1786   }
1787 
isActiveState()1788   public boolean isActiveState() {
1789     return (state.equals(ACTIVE_STATE));
1790   }
1791 
1792   /**
1793    * Returns whether the NameNode is completely started
1794    */
isStarted()1795   boolean isStarted() {
1796     return this.started.get();
1797   }
1798 
1799   /**
1800    * Check that a request to change this node's HA state is valid.
1801    * In particular, verifies that, if auto failover is enabled, non-forced
1802    * requests from the HAAdmin CLI are rejected, and vice versa.
1803    *
1804    * @param req the request to check
1805    * @throws AccessControlException if the request is disallowed
1806    */
checkHaStateChange(StateChangeRequestInfo req)1807   void checkHaStateChange(StateChangeRequestInfo req)
1808       throws AccessControlException {
1809     boolean autoHaEnabled = conf.getBoolean(DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
1810         DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
1811     switch (req.getSource()) {
1812     case REQUEST_BY_USER:
1813       if (autoHaEnabled) {
1814         throw new AccessControlException(
1815             "Manual HA control for this NameNode is disallowed, because " +
1816             "automatic HA is enabled.");
1817       }
1818       break;
1819     case REQUEST_BY_USER_FORCED:
1820       if (autoHaEnabled) {
1821         LOG.warn("Allowing manual HA control from " +
1822             Server.getRemoteAddress() +
1823             " even though automatic HA is enabled, because the user " +
1824             "specified the force flag");
1825       }
1826       break;
1827     case REQUEST_BY_ZKFC:
1828       if (!autoHaEnabled) {
1829         throw new AccessControlException(
1830             "Request from ZK failover controller at " +
1831             Server.getRemoteAddress() + " denied since automatic HA " +
1832             "is not enabled");
1833       }
1834       break;
1835     }
1836   }
1837 }
1838