1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.balancer; 19 20 import static org.junit.Assert.assertEquals; 21 22 import java.io.IOException; 23 import java.net.URI; 24 import java.util.Collection; 25 import java.util.HashSet; 26 import java.util.List; 27 import java.util.Set; 28 import java.util.concurrent.TimeoutException; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import org.apache.hadoop.conf.Configuration; 33 import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 34 import org.apache.hadoop.fs.Path; 35 import org.apache.hadoop.hdfs.DFSConfigKeys; 36 import org.apache.hadoop.hdfs.DFSUtil; 37 import org.apache.hadoop.hdfs.HdfsConfiguration; 38 import org.apache.hadoop.hdfs.MiniDFSCluster; 39 import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup; 40 import org.apache.hadoop.hdfs.NameNodeProxies; 41 import org.apache.hadoop.hdfs.protocol.ClientProtocol; 42 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 43 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 44 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 45 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 46 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; 47 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup; 48 import org.apache.hadoop.net.NetworkTopology; 49 import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; 50 import org.junit.Assert; 51 import org.junit.Test; 52 53 /** 54 * This class tests if a balancer schedules tasks correctly. 55 */ 56 public class TestBalancerWithNodeGroup { 57 private static final Log LOG = LogFactory.getLog( 58 "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup"); 59 60 final private static long CAPACITY = 5000L; 61 final private static String RACK0 = "/rack0"; 62 final private static String RACK1 = "/rack1"; 63 final private static String NODEGROUP0 = "/nodegroup0"; 64 final private static String NODEGROUP1 = "/nodegroup1"; 65 final private static String NODEGROUP2 = "/nodegroup2"; 66 final static private String fileName = "/tmp.txt"; 67 final static private Path filePath = new Path(fileName); 68 MiniDFSClusterWithNodeGroup cluster; 69 70 ClientProtocol client; 71 72 static final long TIMEOUT = 40000L; //msec 73 static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5% 74 static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta 75 static final int DEFAULT_BLOCK_SIZE = 100; 76 77 static { TestBalancer.initTestSetup()78 TestBalancer.initTestSetup(); 79 } 80 createConf()81 static Configuration createConf() { 82 Configuration conf = new HdfsConfiguration(); 83 TestBalancer.initConf(conf); 84 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); 85 conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 86 NetworkTopologyWithNodeGroup.class.getName()); 87 conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, 88 BlockPlacementPolicyWithNodeGroup.class.getName()); 89 return conf; 90 } 91 92 /** 93 * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE, 94 * summed over all nodes. Times out after TIMEOUT msec. 95 * @param expectedUsedSpace 96 * @param expectedTotalSpace 97 * @throws IOException - if getStats() fails 98 * @throws TimeoutException 99 */ waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)100 private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace) 101 throws IOException, TimeoutException { 102 long timeout = TIMEOUT; 103 long failtime = (timeout <= 0L) ? Long.MAX_VALUE 104 : System.currentTimeMillis() + timeout; 105 106 while (true) { 107 long[] status = client.getStats(); 108 double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace) 109 / expectedTotalSpace; 110 double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace) 111 / expectedUsedSpace; 112 if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE 113 && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE) 114 break; //done 115 116 if (System.currentTimeMillis() > failtime) { 117 throw new TimeoutException("Cluster failed to reached expected values of " 118 + "totalSpace (current: " + status[0] 119 + ", expected: " + expectedTotalSpace 120 + "), or usedSpace (current: " + status[1] 121 + ", expected: " + expectedUsedSpace 122 + "), in more than " + timeout + " msec."); 123 } 124 try { 125 Thread.sleep(100L); 126 } catch(InterruptedException ignored) { 127 } 128 } 129 } 130 131 /** 132 * Wait until balanced: each datanode gives utilization within 133 * BALANCE_ALLOWED_VARIANCE of average 134 * @throws IOException 135 * @throws TimeoutException 136 */ waitForBalancer(long totalUsedSpace, long totalCapacity)137 private void waitForBalancer(long totalUsedSpace, long totalCapacity) 138 throws IOException, TimeoutException { 139 long timeout = TIMEOUT; 140 long failtime = (timeout <= 0L) ? Long.MAX_VALUE 141 : System.currentTimeMillis() + timeout; 142 final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; 143 boolean balanced; 144 do { 145 DatanodeInfo[] datanodeReport = 146 client.getDatanodeReport(DatanodeReportType.ALL); 147 assertEquals(datanodeReport.length, cluster.getDataNodes().size()); 148 balanced = true; 149 for (DatanodeInfo datanode : datanodeReport) { 150 double nodeUtilization = ((double)datanode.getDfsUsed()) 151 / datanode.getCapacity(); 152 if (Math.abs(avgUtilization - nodeUtilization) > 153 BALANCE_ALLOWED_VARIANCE) { 154 balanced = false; 155 if (System.currentTimeMillis() > failtime) { 156 throw new TimeoutException( 157 "Rebalancing expected avg utilization to become " 158 + avgUtilization + ", but on datanode " + datanode 159 + " it remains at " + nodeUtilization 160 + " after more than " + TIMEOUT + " msec."); 161 } 162 try { 163 Thread.sleep(100); 164 } catch (InterruptedException ignored) { 165 } 166 break; 167 } 168 } 169 } while (!balanced); 170 } 171 runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity)172 private void runBalancer(Configuration conf, 173 long totalUsedSpace, long totalCapacity) throws Exception { 174 waitForHeartBeat(totalUsedSpace, totalCapacity); 175 176 // start rebalancing 177 Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); 178 final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); 179 assertEquals(ExitStatus.SUCCESS.getExitCode(), r); 180 181 waitForHeartBeat(totalUsedSpace, totalCapacity); 182 LOG.info("Rebalancing with default factor."); 183 waitForBalancer(totalUsedSpace, totalCapacity); 184 } 185 runBalancerCanFinish(Configuration conf, long totalUsedSpace, long totalCapacity)186 private void runBalancerCanFinish(Configuration conf, 187 long totalUsedSpace, long totalCapacity) throws Exception { 188 waitForHeartBeat(totalUsedSpace, totalCapacity); 189 190 // start rebalancing 191 Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); 192 final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); 193 Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() || 194 (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode())); 195 waitForHeartBeat(totalUsedSpace, totalCapacity); 196 LOG.info("Rebalancing with default factor."); 197 } 198 getBlocksOnRack(List<LocatedBlock> blks, String rack)199 private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack) { 200 Set<ExtendedBlock> ret = new HashSet<ExtendedBlock>(); 201 for (LocatedBlock blk : blks) { 202 for (DatanodeInfo di : blk.getLocations()) { 203 if (rack.equals(NetworkTopology.getFirstHalf(di.getNetworkLocation()))) { 204 ret.add(blk.getBlock()); 205 break; 206 } 207 } 208 } 209 return ret; 210 } 211 212 /** 213 * Create a cluster with even distribution, and a new empty node is added to 214 * the cluster, then test rack locality for balancer policy. 215 */ 216 @Test(timeout=60000) testBalancerWithRackLocality()217 public void testBalancerWithRackLocality() throws Exception { 218 Configuration conf = createConf(); 219 long[] capacities = new long[]{CAPACITY, CAPACITY}; 220 String[] racks = new String[]{RACK0, RACK1}; 221 String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1}; 222 223 int numOfDatanodes = capacities.length; 224 assertEquals(numOfDatanodes, racks.length); 225 MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf) 226 .numDataNodes(capacities.length) 227 .racks(racks) 228 .simulatedCapacities(capacities); 229 MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); 230 cluster = new MiniDFSClusterWithNodeGroup(builder); 231 try { 232 cluster.waitActive(); 233 client = NameNodeProxies.createProxy(conf, 234 cluster.getFileSystem(0).getUri(), 235 ClientProtocol.class).getProxy(); 236 237 long totalCapacity = TestBalancer.sum(capacities); 238 239 // fill up the cluster to be 30% full 240 long totalUsedSpace = totalCapacity * 3 / 10; 241 long length = totalUsedSpace / numOfDatanodes; 242 TestBalancer.createFile(cluster, filePath, length, 243 (short) numOfDatanodes, 0); 244 245 LocatedBlocks lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, 246 length); 247 Set<ExtendedBlock> before = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0); 248 249 long newCapacity = CAPACITY; 250 String newRack = RACK1; 251 String newNodeGroup = NODEGROUP2; 252 // start up an empty node with the same capacity and on the same rack 253 cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, 254 new long[] {newCapacity}, new String[]{newNodeGroup}); 255 256 totalCapacity += newCapacity; 257 258 // run balancer and validate results 259 runBalancerCanFinish(conf, totalUsedSpace, totalCapacity); 260 261 lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, length); 262 Set<ExtendedBlock> after = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0); 263 assertEquals(before, after); 264 265 } finally { 266 cluster.shutdown(); 267 } 268 } 269 270 /** 271 * Create a cluster with even distribution, and a new empty node is added to 272 * the cluster, then test node-group locality for balancer policy. 273 */ 274 @Test(timeout=60000) testBalancerWithNodeGroup()275 public void testBalancerWithNodeGroup() throws Exception { 276 Configuration conf = createConf(); 277 long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY}; 278 String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1}; 279 String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2}; 280 281 int numOfDatanodes = capacities.length; 282 assertEquals(numOfDatanodes, racks.length); 283 assertEquals(numOfDatanodes, nodeGroups.length); 284 MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf) 285 .numDataNodes(capacities.length) 286 .racks(racks) 287 .simulatedCapacities(capacities); 288 MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); 289 cluster = new MiniDFSClusterWithNodeGroup(builder); 290 try { 291 cluster.waitActive(); 292 client = NameNodeProxies.createProxy(conf, 293 cluster.getFileSystem(0).getUri(), 294 ClientProtocol.class).getProxy(); 295 296 long totalCapacity = TestBalancer.sum(capacities); 297 // fill up the cluster to be 20% full 298 long totalUsedSpace = totalCapacity * 2 / 10; 299 TestBalancer.createFile(cluster, filePath, totalUsedSpace / (numOfDatanodes/2), 300 (short) (numOfDatanodes/2), 0); 301 302 long newCapacity = CAPACITY; 303 String newRack = RACK1; 304 String newNodeGroup = NODEGROUP2; 305 // start up an empty node with the same capacity and on NODEGROUP2 306 cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, 307 new long[] {newCapacity}, new String[]{newNodeGroup}); 308 309 totalCapacity += newCapacity; 310 311 // run balancer and validate results 312 runBalancer(conf, totalUsedSpace, totalCapacity); 313 314 } finally { 315 cluster.shutdown(); 316 } 317 } 318 319 /** 320 * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2) 321 * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster 322 * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according 323 * to replica placement policy with NodeGroup. As a result, n2 and n3 will be 324 * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3 325 * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer 326 * to end in 5 iterations without move block process. 327 */ 328 @Test(timeout=60000) testBalancerEndInNoMoveProgress()329 public void testBalancerEndInNoMoveProgress() throws Exception { 330 Configuration conf = createConf(); 331 long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY}; 332 String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1}; 333 String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2}; 334 335 int numOfDatanodes = capacities.length; 336 assertEquals(numOfDatanodes, racks.length); 337 assertEquals(numOfDatanodes, nodeGroups.length); 338 MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf) 339 .numDataNodes(capacities.length) 340 .racks(racks) 341 .simulatedCapacities(capacities); 342 MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); 343 cluster = new MiniDFSClusterWithNodeGroup(builder); 344 try { 345 cluster.waitActive(); 346 client = NameNodeProxies.createProxy(conf, 347 cluster.getFileSystem(0).getUri(), 348 ClientProtocol.class).getProxy(); 349 350 long totalCapacity = TestBalancer.sum(capacities); 351 // fill up the cluster to be 60% full 352 long totalUsedSpace = totalCapacity * 6 / 10; 353 TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3, 354 (short) (3), 0); 355 356 // run balancer which can finish in 5 iterations with no block movement. 357 runBalancerCanFinish(conf, totalUsedSpace, totalCapacity); 358 359 } finally { 360 cluster.shutdown(); 361 } 362 } 363 } 364