1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.namenode.ha;
19 
20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
23 
24 import java.io.IOException;
25 import java.net.InetSocketAddress;
26 import java.net.URI;
27 import java.net.URISyntaxException;
28 import java.util.List;
29 import java.util.concurrent.TimeoutException;
30 
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.hdfs.DFSConfigKeys;
36 import org.apache.hadoop.hdfs.DFSUtil;
37 import org.apache.hadoop.hdfs.MiniDFSCluster;
38 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
39 import org.apache.hadoop.hdfs.server.datanode.DataNode;
40 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
41 import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
42 import org.apache.hadoop.hdfs.server.namenode.NameNode;
43 import org.apache.hadoop.test.GenericTestUtils;
44 import org.apache.hadoop.util.Time;
45 
46 import com.google.common.base.Supplier;
47 
48 /**
49  * Static utility functions useful for testing HA.
50  */
51 public abstract class HATestUtil {
52   private static final Log LOG = LogFactory.getLog(HATestUtil.class);
53 
54   private static final String LOGICAL_HOSTNAME = "ha-nn-uri-%d";
55 
56   /**
57    * Trigger an edits log roll on the active and then wait for the standby to
58    * catch up to all the edits done by the active. This method will check
59    * repeatedly for up to NN_LAG_TIMEOUT milliseconds, and then fail throwing
60    * {@link CouldNotCatchUpException}
61    *
62    * @param active active NN
63    * @param standby standby NN which should catch up to active
64    * @throws IOException if an error occurs rolling the edit log
65    * @throws CouldNotCatchUpException if the standby doesn't catch up to the
66    *         active in NN_LAG_TIMEOUT milliseconds
67    */
waitForStandbyToCatchUp(NameNode active, NameNode standby)68   public static void waitForStandbyToCatchUp(NameNode active,
69       NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
70 
71     long activeTxId = active.getNamesystem().getFSImage().getEditLog()
72       .getLastWrittenTxId();
73 
74     active.getRpcServer().rollEditLog();
75 
76     long start = Time.now();
77     while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) {
78       long nn2HighestTxId = standby.getNamesystem().getFSImage()
79         .getLastAppliedTxId();
80       if (nn2HighestTxId >= activeTxId) {
81         return;
82       }
83       Thread.sleep(TestEditLogTailer.SLEEP_TIME);
84     }
85     throw new CouldNotCatchUpException("Standby did not catch up to txid " +
86         activeTxId + " (currently at " +
87         standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")");
88   }
89 
90   /**
91    * Wait for the datanodes in the cluster to process any block
92    * deletions that have already been asynchronously queued.
93    */
waitForDNDeletions(final MiniDFSCluster cluster)94   public static void waitForDNDeletions(final MiniDFSCluster cluster)
95       throws TimeoutException, InterruptedException {
96     GenericTestUtils.waitFor(new Supplier<Boolean>() {
97       @Override
98       public Boolean get() {
99         for (DataNode dn : cluster.getDataNodes()) {
100           if (DataNodeTestUtils.getPendingAsyncDeletions(dn) > 0) {
101             return false;
102           }
103         }
104         return true;
105       }
106     }, 1000, 10000);
107 
108   }
109 
110   /**
111    * Wait for the NameNode to issue any deletions that are already
112    * pending (i.e. for the pendingDeletionBlocksCount to go to 0)
113    */
waitForNNToIssueDeletions(final NameNode nn)114   public static void waitForNNToIssueDeletions(final NameNode nn)
115       throws Exception {
116     GenericTestUtils.waitFor(new Supplier<Boolean>() {
117       @Override
118       public Boolean get() {
119         LOG.info("Waiting for NN to issue block deletions to DNs");
120         return nn.getNamesystem().getBlockManager().getPendingDeletionBlocksCount() == 0;
121       }
122     }, 250, 10000);
123   }
124 
125   public static class CouldNotCatchUpException extends IOException {
126     private static final long serialVersionUID = 1L;
127 
CouldNotCatchUpException(String message)128     public CouldNotCatchUpException(String message) {
129       super(message);
130     }
131   }
132 
133   /** Gets the filesystem instance by setting the failover configurations */
configureFailoverFs(MiniDFSCluster cluster, Configuration conf)134   public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
135       throws IOException, URISyntaxException {
136     return configureFailoverFs(cluster, conf, 0);
137   }
138 
139   /**
140    * Gets the filesystem instance by setting the failover configurations
141    * @param cluster the single process DFS cluster
142    * @param conf cluster configuration
143    * @param nsIndex namespace index starting with zero
144    * @throws IOException if an error occurs rolling the edit log
145    */
configureFailoverFs(MiniDFSCluster cluster, Configuration conf, int nsIndex)146   public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf,
147       int nsIndex) throws IOException, URISyntaxException {
148     conf = new Configuration(conf);
149     String logicalName = getLogicalHostname(cluster);
150     setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
151     FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
152     return fs;
153   }
154 
setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf)155   public static void setFailoverConfigurations(MiniDFSCluster cluster,
156       Configuration conf) {
157     setFailoverConfigurations(cluster, conf, getLogicalHostname(cluster));
158   }
159 
160   /** Sets the required configurations for performing failover of default namespace. */
setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName)161   public static void setFailoverConfigurations(MiniDFSCluster cluster,
162       Configuration conf, String logicalName) {
163     setFailoverConfigurations(cluster, conf, logicalName, 0);
164   }
165 
166   /** Sets the required configurations for performing failover.  */
setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName, int nsIndex)167   public static void setFailoverConfigurations(MiniDFSCluster cluster,
168       Configuration conf, String logicalName, int nsIndex) {
169     InetSocketAddress nnAddr1 = cluster.getNameNode(2 * nsIndex).getNameNodeAddress();
170     InetSocketAddress nnAddr2 = cluster.getNameNode(2 * nsIndex + 1).getNameNodeAddress();
171     setFailoverConfigurations(conf, logicalName, nnAddr1, nnAddr2);
172   }
173 
174   /**
175    * Sets the required configurations for performing failover
176    */
setFailoverConfigurations(Configuration conf, String logicalName, InetSocketAddress nnAddr1, InetSocketAddress nnAddr2)177   public static void setFailoverConfigurations(Configuration conf,
178       String logicalName, InetSocketAddress nnAddr1,
179       InetSocketAddress nnAddr2) {
180     String nameNodeId1 = "nn1";
181     String nameNodeId2 = "nn2";
182     String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
183     String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
184     conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
185         logicalName, nameNodeId1), address1);
186     conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
187         logicalName, nameNodeId2), address2);
188 
189     conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
190     conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName),
191         nameNodeId1 + "," + nameNodeId2);
192     conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
193         ConfiguredFailoverProxyProvider.class.getName());
194     conf.set("fs.defaultFS", "hdfs://" + logicalName);
195   }
196 
197 
getLogicalHostname(MiniDFSCluster cluster)198   public static String getLogicalHostname(MiniDFSCluster cluster) {
199     return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
200   }
201 
getLogicalUri(MiniDFSCluster cluster)202   public static URI getLogicalUri(MiniDFSCluster cluster)
203       throws URISyntaxException {
204     return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" +
205         getLogicalHostname(cluster));
206   }
207 
waitForCheckpoint(MiniDFSCluster cluster, int nnIdx, List<Integer> txids)208   public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
209       List<Integer> txids) throws InterruptedException {
210     long start = Time.now();
211     while (true) {
212       try {
213         FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids);
214         return;
215       } catch (AssertionError err) {
216         if (Time.now() - start > 10000) {
217           throw err;
218         } else {
219           Thread.sleep(300);
220         }
221       }
222     }
223   }
224 }
225