1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.balancer; 19 20 import static org.junit.Assert.assertEquals; 21 22 import java.io.IOException; 23 import java.util.HashMap; 24 import java.util.Map; 25 import java.util.Random; 26 import java.util.concurrent.TimeoutException; 27 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 import org.apache.hadoop.conf.Configuration; 31 import org.apache.hadoop.fs.FileSystem; 32 import org.apache.hadoop.fs.Path; 33 import org.apache.hadoop.hdfs.DFSClient; 34 import org.apache.hadoop.hdfs.DFSTestUtil; 35 import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup; 36 import org.apache.hadoop.hdfs.protocol.ClientProtocol; 37 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 38 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; 39 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; 40 import org.apache.hadoop.net.NetworkTopology; 41 import org.junit.Assert; 42 import org.junit.Test; 43 44 /** 45 * This class tests if a balancer schedules tasks correctly. 46 */ 47 public class TestBalancerWithNodeGroup { 48 49 final private static long CAPACITY = 500L; 50 final private static String RACK0 = "/rack0"; 51 final private static String RACK1 = "/rack1"; 52 final private static String NODEGROUP0 = "/nodegroup0"; 53 final private static String NODEGROUP1 = "/nodegroup1"; 54 final private static String NODEGROUP2 = "/nodegroup2"; 55 final static private String fileName = "/tmp.txt"; 56 final static private Path filePath = new Path(fileName); 57 private static final Log LOG = LogFactory.getLog( 58 "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup"); 59 MiniDFSClusterWithNodeGroup cluster; 60 61 ClientProtocol client; 62 private Balancer balancer; 63 64 static final long TIMEOUT = 20000L; //msec 65 static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5% 66 static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta 67 static final int DEFAULT_BLOCK_SIZE = 5; 68 private static final Random r = new Random(); 69 70 static { 71 Balancer.setBlockMoveWaitTime(1000L) ; 72 } 73 initConf(Configuration conf)74 private void initConf(Configuration conf) { 75 conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); 76 conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE); 77 conf.setLong("dfs.heartbeat.interval", 1L); 78 conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); 79 conf.setLong("dfs.balancer.movedWinWidth", 2000L); 80 conf.set("net.topology.impl", "org.apache.hadoop.net.NetworkTopologyWithNodeGroup"); 81 conf.set("dfs.block.replicator.classname", 82 "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup"); 83 } 84 85 /* create a file with a length of <code>fileLen</code> */ createFile(long fileLen, short replicationFactor)86 private void createFile(long fileLen, short replicationFactor) 87 throws IOException { 88 FileSystem fs = cluster.getFileSystem(); 89 DFSTestUtil.createFile(fs, filePath, fileLen, 90 replicationFactor, r.nextLong()); 91 DFSTestUtil.waitReplication(fs, filePath, replicationFactor); 92 } 93 94 // create and initiate conf for balancer createConf()95 private Configuration createConf() { 96 Configuration conf = new Configuration(); 97 initConf(conf); 98 return conf; 99 } 100 101 /** 102 * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE, 103 * summed over all nodes. Times out after TIMEOUT msec. 104 * @param expectedUsedSpace 105 * @param expectedTotalSpace 106 * @throws IOException - if getStats() fails 107 * @throws TimeoutException 108 */ waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)109 private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace) 110 throws IOException, TimeoutException { 111 long timeout = TIMEOUT; 112 long failtime = (timeout <= 0L) ? Long.MAX_VALUE 113 : System.currentTimeMillis() + timeout; 114 115 while (true) { 116 long[] status = client.getStats(); 117 double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace) 118 / expectedTotalSpace; 119 double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace) 120 / expectedUsedSpace; 121 if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE 122 && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE) 123 break; //done 124 125 if (System.currentTimeMillis() > failtime) { 126 throw new TimeoutException("Cluster failed to reached expected values of " 127 + "totalSpace (current: " + status[0] 128 + ", expected: " + expectedTotalSpace 129 + "), or usedSpace (current: " + status[1] 130 + ", expected: " + expectedUsedSpace 131 + "), in more than " + timeout + " msec."); 132 } 133 try { 134 Thread.sleep(100L); 135 } catch(InterruptedException ignored) { 136 } 137 } 138 } 139 140 /** 141 * Wait until balanced: each datanode gives utilization within 142 * BALANCE_ALLOWED_VARIANCE of average 143 * @throws IOException 144 * @throws TimeoutException 145 */ waitForBalancer(long totalUsedSpace, long totalCapacity)146 private void waitForBalancer(long totalUsedSpace, long totalCapacity) 147 throws IOException, TimeoutException { 148 long timeout = TIMEOUT; 149 long failtime = (timeout <= 0L) ? Long.MAX_VALUE 150 : System.currentTimeMillis() + timeout; 151 final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; 152 boolean balanced; 153 do { 154 DatanodeInfo[] datanodeReport = 155 client.getDatanodeReport(DatanodeReportType.ALL); 156 assertEquals(datanodeReport.length, cluster.getDataNodes().size()); 157 balanced = true; 158 for (DatanodeInfo datanode : datanodeReport) { 159 double nodeUtilization = ((double)datanode.getDfsUsed()) 160 / datanode.getCapacity(); 161 if (Math.abs(avgUtilization - nodeUtilization) > 162 BALANCE_ALLOWED_VARIANCE) { 163 balanced = false; 164 if (System.currentTimeMillis() > failtime) { 165 throw new TimeoutException( 166 "Rebalancing expected avg utilization to become " 167 + avgUtilization + ", but on datanode " + datanode 168 + " it remains at " + nodeUtilization 169 + " after more than " + TIMEOUT + " msec."); 170 } 171 try { 172 Thread.sleep(100); 173 } catch (InterruptedException ignored) { 174 } 175 break; 176 } 177 } 178 } while (!balanced); 179 } 180 181 /** Start balancer and check if the cluster is balanced after the run */ runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity)182 private void runBalancer(Configuration conf, 183 long totalUsedSpace, long totalCapacity) throws Exception { 184 waitForHeartBeat(totalUsedSpace, totalCapacity); 185 186 // start rebalancing 187 balancer = new Balancer(conf); 188 189 final int r = balancer.run(new String[0]); 190 191 assertEquals(Balancer.SUCCESS, r); 192 193 waitForHeartBeat(totalUsedSpace, totalCapacity); 194 LOG.info("Rebalancing with default factor."); 195 waitForBalancer(totalUsedSpace, totalCapacity); 196 } 197 runBalancerCanFinish(Configuration conf, long totalUsedSpace, long totalCapacity)198 private void runBalancerCanFinish(Configuration conf, 199 long totalUsedSpace, long totalCapacity) throws Exception { 200 waitForHeartBeat(totalUsedSpace, totalCapacity); 201 202 balancer = new Balancer(conf); 203 final int r = balancer.run(new String[0]); 204 Assert.assertTrue(r == Balancer.SUCCESS || 205 (r == Balancer.NO_MOVE_PROGRESS)); 206 waitForHeartBeat(totalUsedSpace, totalCapacity); 207 LOG.info("Rebalancing ends successful."); 208 } 209 210 /** 211 * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2) 212 * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster 213 * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according 214 * to replica placement policy with NodeGroup. As a result, n2 and n3 will be 215 * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3 216 * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer 217 * to end in 5 iterations without move block process. 218 */ 219 @Test(timeout=60000) testBalancerEndInNoMoveProgress()220 public void testBalancerEndInNoMoveProgress() throws Exception { 221 Configuration conf = createConf(); 222 long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY}; 223 String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1}; 224 String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2}; 225 226 int numOfDatanodes = capacities.length; 227 assertEquals(numOfDatanodes, racks.length); 228 assertEquals(numOfDatanodes, nodeGroups.length); 229 MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); 230 cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length, 231 true, true, null, racks, capacities); 232 try { 233 cluster.waitActive(); 234 client = DFSClient.createNamenode(conf); 235 236 long totalCapacity = 0L; 237 for(long capacity : capacities) { 238 totalCapacity += capacity; 239 } 240 241 // fill up the cluster to be 60% full 242 long totalUsedSpace = totalCapacity * 6 / 10; 243 244 createFile(totalUsedSpace / 3, (short) 3); 245 246 // run balancer which can finish in 5 iterations with no block movement. 247 runBalancerCanFinish(conf, totalUsedSpace, totalCapacity); 248 249 } finally { 250 cluster.shutdown(); 251 } 252 } 253 254 /** 255 * Create a cluster with even distribution, and a new empty node is added to 256 * the cluster, then test rack locality for balancer policy. 257 */ 258 @Test(timeout=60000) testBalancerWithRackLocality()259 public void testBalancerWithRackLocality() throws Exception { 260 Configuration conf = createConf(); 261 long[] capacities = new long[]{CAPACITY, CAPACITY}; 262 String[] racks = new String[]{RACK0, RACK1}; 263 String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1}; 264 265 int numOfDatanodes = capacities.length; 266 assertEquals(numOfDatanodes, racks.length); 267 MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); 268 269 MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); 270 cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length, 271 true, true, null, racks, capacities); 272 try { 273 cluster.waitActive(); 274 275 client = DFSClient.createNamenode(conf); 276 277 long totalCapacity = 0L; 278 for(long capacity : capacities) { 279 totalCapacity += capacity; 280 } 281 282 // fill up the cluster to be 30% full 283 long totalUsedSpace = totalCapacity * 3 / 10; 284 createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes); 285 286 long newCapacity = CAPACITY; 287 String newRack = RACK1; 288 String newNodeGroup = NODEGROUP2; 289 290 // start up an empty node with the same capacity and on the same rack 291 cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, 292 new String[]{newNodeGroup}, new long[] {newCapacity}); 293 294 totalCapacity += newCapacity; 295 296 // run balancer and validate results 297 runBalancerCanFinish(conf, totalUsedSpace, totalCapacity); 298 299 DatanodeInfo[] datanodeReport = 300 client.getDatanodeReport(DatanodeReportType.ALL); 301 302 Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>(); 303 for (DatanodeInfo datanode: datanodeReport) { 304 String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation()); 305 int usedCapacity = (int) datanode.getDfsUsed(); 306 307 if (rackToUsedCapacity.get(rack) != null) { 308 rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack)); 309 } else { 310 rackToUsedCapacity.put(rack, usedCapacity); 311 } 312 } 313 assertEquals(rackToUsedCapacity.size(), 2); 314 assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1)); 315 316 } finally { 317 cluster.shutdown(); 318 } 319 } 320 321 /** Create a cluster with even distribution, and a new empty node is added to 322 * the cluster, then test rack locality for balancer policy. 323 **/ 324 @Test(timeout=60000) testBalancerWithNodeGroup()325 public void testBalancerWithNodeGroup() throws Exception { 326 Configuration conf = createConf(); 327 long[] capacities = new long[]{CAPACITY, CAPACITY}; 328 String[] racks = new String[]{RACK0, RACK1}; 329 String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1}; 330 331 int numOfDatanodes = capacities.length; 332 assertEquals(numOfDatanodes, racks.length); 333 MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); 334 cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length, 335 true, true, null, racks, capacities); 336 try { 337 cluster.waitActive(); 338 client = DFSClient.createNamenode(conf); 339 340 long totalCapacity = 0L; 341 for(long capacity : capacities) { 342 totalCapacity += capacity; 343 } 344 345 // fill up the cluster to be 30% full 346 long totalUsedSpace = totalCapacity*3/10; 347 348 createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes); 349 350 long newCapacity = CAPACITY; 351 String newRack = RACK1; 352 String newNodeGroup = NODEGROUP2; 353 // start up an empty node with the same capacity and on the same rack 354 cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, 355 new String[]{newNodeGroup}, new long[] {newCapacity}); 356 357 totalCapacity += newCapacity; 358 359 // run balancer and validate results 360 runBalancer(conf, totalUsedSpace, totalCapacity); 361 362 } finally { 363 cluster.shutdown(); 364 } 365 } 366 367 }