1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.namenode.ha; 19 20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX; 21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; 22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; 23 24 import java.io.IOException; 25 import java.net.InetSocketAddress; 26 import java.net.URI; 27 import java.net.URISyntaxException; 28 import java.util.List; 29 import java.util.concurrent.TimeoutException; 30 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.fs.FileSystem; 35 import org.apache.hadoop.hdfs.DFSConfigKeys; 36 import org.apache.hadoop.hdfs.DFSUtil; 37 import org.apache.hadoop.hdfs.MiniDFSCluster; 38 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 39 import org.apache.hadoop.hdfs.server.datanode.DataNode; 40 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; 41 import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; 42 import org.apache.hadoop.hdfs.server.namenode.NameNode; 43 import org.apache.hadoop.test.GenericTestUtils; 44 import org.apache.hadoop.util.Time; 45 46 import com.google.common.base.Supplier; 47 48 /** 49 * Static utility functions useful for testing HA. 50 */ 51 public abstract class HATestUtil { 52 private static final Log LOG = LogFactory.getLog(HATestUtil.class); 53 54 private static final String LOGICAL_HOSTNAME = "ha-nn-uri-%d"; 55 56 /** 57 * Trigger an edits log roll on the active and then wait for the standby to 58 * catch up to all the edits done by the active. This method will check 59 * repeatedly for up to NN_LAG_TIMEOUT milliseconds, and then fail throwing 60 * {@link CouldNotCatchUpException} 61 * 62 * @param active active NN 63 * @param standby standby NN which should catch up to active 64 * @throws IOException if an error occurs rolling the edit log 65 * @throws CouldNotCatchUpException if the standby doesn't catch up to the 66 * active in NN_LAG_TIMEOUT milliseconds 67 */ waitForStandbyToCatchUp(NameNode active, NameNode standby)68 public static void waitForStandbyToCatchUp(NameNode active, 69 NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException { 70 71 long activeTxId = active.getNamesystem().getFSImage().getEditLog() 72 .getLastWrittenTxId(); 73 74 active.getRpcServer().rollEditLog(); 75 76 long start = Time.now(); 77 while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) { 78 long nn2HighestTxId = standby.getNamesystem().getFSImage() 79 .getLastAppliedTxId(); 80 if (nn2HighestTxId >= activeTxId) { 81 return; 82 } 83 Thread.sleep(TestEditLogTailer.SLEEP_TIME); 84 } 85 throw new CouldNotCatchUpException("Standby did not catch up to txid " + 86 activeTxId + " (currently at " + 87 standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")"); 88 } 89 90 /** 91 * Wait for the datanodes in the cluster to process any block 92 * deletions that have already been asynchronously queued. 93 */ waitForDNDeletions(final MiniDFSCluster cluster)94 public static void waitForDNDeletions(final MiniDFSCluster cluster) 95 throws TimeoutException, InterruptedException { 96 GenericTestUtils.waitFor(new Supplier<Boolean>() { 97 @Override 98 public Boolean get() { 99 for (DataNode dn : cluster.getDataNodes()) { 100 if (DataNodeTestUtils.getPendingAsyncDeletions(dn) > 0) { 101 return false; 102 } 103 } 104 return true; 105 } 106 }, 1000, 10000); 107 108 } 109 110 /** 111 * Wait for the NameNode to issue any deletions that are already 112 * pending (i.e. for the pendingDeletionBlocksCount to go to 0) 113 */ waitForNNToIssueDeletions(final NameNode nn)114 public static void waitForNNToIssueDeletions(final NameNode nn) 115 throws Exception { 116 GenericTestUtils.waitFor(new Supplier<Boolean>() { 117 @Override 118 public Boolean get() { 119 LOG.info("Waiting for NN to issue block deletions to DNs"); 120 return nn.getNamesystem().getBlockManager().getPendingDeletionBlocksCount() == 0; 121 } 122 }, 250, 10000); 123 } 124 125 public static class CouldNotCatchUpException extends IOException { 126 private static final long serialVersionUID = 1L; 127 CouldNotCatchUpException(String message)128 public CouldNotCatchUpException(String message) { 129 super(message); 130 } 131 } 132 133 /** Gets the filesystem instance by setting the failover configurations */ configureFailoverFs(MiniDFSCluster cluster, Configuration conf)134 public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf) 135 throws IOException, URISyntaxException { 136 return configureFailoverFs(cluster, conf, 0); 137 } 138 139 /** 140 * Gets the filesystem instance by setting the failover configurations 141 * @param cluster the single process DFS cluster 142 * @param conf cluster configuration 143 * @param nsIndex namespace index starting with zero 144 * @throws IOException if an error occurs rolling the edit log 145 */ configureFailoverFs(MiniDFSCluster cluster, Configuration conf, int nsIndex)146 public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf, 147 int nsIndex) throws IOException, URISyntaxException { 148 conf = new Configuration(conf); 149 String logicalName = getLogicalHostname(cluster); 150 setFailoverConfigurations(cluster, conf, logicalName, nsIndex); 151 FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); 152 return fs; 153 } 154 setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf)155 public static void setFailoverConfigurations(MiniDFSCluster cluster, 156 Configuration conf) { 157 setFailoverConfigurations(cluster, conf, getLogicalHostname(cluster)); 158 } 159 160 /** Sets the required configurations for performing failover of default namespace. */ setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName)161 public static void setFailoverConfigurations(MiniDFSCluster cluster, 162 Configuration conf, String logicalName) { 163 setFailoverConfigurations(cluster, conf, logicalName, 0); 164 } 165 166 /** Sets the required configurations for performing failover. */ setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName, int nsIndex)167 public static void setFailoverConfigurations(MiniDFSCluster cluster, 168 Configuration conf, String logicalName, int nsIndex) { 169 InetSocketAddress nnAddr1 = cluster.getNameNode(2 * nsIndex).getNameNodeAddress(); 170 InetSocketAddress nnAddr2 = cluster.getNameNode(2 * nsIndex + 1).getNameNodeAddress(); 171 setFailoverConfigurations(conf, logicalName, nnAddr1, nnAddr2); 172 } 173 174 /** 175 * Sets the required configurations for performing failover 176 */ setFailoverConfigurations(Configuration conf, String logicalName, InetSocketAddress nnAddr1, InetSocketAddress nnAddr2)177 public static void setFailoverConfigurations(Configuration conf, 178 String logicalName, InetSocketAddress nnAddr1, 179 InetSocketAddress nnAddr2) { 180 String nameNodeId1 = "nn1"; 181 String nameNodeId2 = "nn2"; 182 String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort(); 183 String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort(); 184 conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, 185 logicalName, nameNodeId1), address1); 186 conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, 187 logicalName, nameNodeId2), address2); 188 189 conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName); 190 conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName), 191 nameNodeId1 + "," + nameNodeId2); 192 conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, 193 ConfiguredFailoverProxyProvider.class.getName()); 194 conf.set("fs.defaultFS", "hdfs://" + logicalName); 195 } 196 197 getLogicalHostname(MiniDFSCluster cluster)198 public static String getLogicalHostname(MiniDFSCluster cluster) { 199 return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId()); 200 } 201 getLogicalUri(MiniDFSCluster cluster)202 public static URI getLogicalUri(MiniDFSCluster cluster) 203 throws URISyntaxException { 204 return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + 205 getLogicalHostname(cluster)); 206 } 207 waitForCheckpoint(MiniDFSCluster cluster, int nnIdx, List<Integer> txids)208 public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx, 209 List<Integer> txids) throws InterruptedException { 210 long start = Time.now(); 211 while (true) { 212 try { 213 FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids); 214 return; 215 } catch (AssertionError err) { 216 if (Time.now() - start > 10000) { 217 throw err; 218 } else { 219 Thread.sleep(300); 220 } 221 } 222 } 223 } 224 } 225