1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.balancer; 19 20 import static org.apache.hadoop.fs.StorageType.DEFAULT; 21 import static org.apache.hadoop.fs.StorageType.RAM_DISK; 22 import static org.apache.hadoop.hdfs.DFSConfigKeys.*; 23 import static org.junit.Assert.assertEquals; 24 import static org.junit.Assert.assertTrue; 25 import static org.junit.Assert.fail; 26 import static org.junit.Assume.assumeTrue; 27 28 import java.io.File; 29 import java.io.IOException; 30 import java.io.PrintWriter; 31 import java.net.InetAddress; 32 import java.net.URI; 33 import java.net.InetSocketAddress; 34 import java.util.ArrayList; 35 import java.util.Arrays; 36 import java.util.Collection; 37 import java.util.Collections; 38 import java.util.HashSet; 39 import java.util.List; 40 import java.util.Random; 41 import java.util.Set; 42 import java.util.concurrent.TimeoutException; 43 44 import org.apache.commons.lang.StringUtils; 45 import org.apache.commons.logging.Log; 46 import org.apache.commons.logging.LogFactory; 47 import org.apache.commons.logging.impl.Log4JLogger; 48 import org.apache.hadoop.conf.Configuration; 49 import org.apache.hadoop.fs.FSDataOutputStream; 50 import org.apache.hadoop.fs.FileSystem; 51 import org.apache.hadoop.fs.Path; 52 import org.apache.hadoop.fs.StorageType; 53 import org.apache.hadoop.fs.permission.FsPermission; 54 import org.apache.hadoop.hdfs.DFSClient; 55 import org.apache.hadoop.hdfs.DFSConfigKeys; 56 import org.apache.hadoop.hdfs.DFSTestUtil; 57 import org.apache.hadoop.hdfs.DFSUtil; 58 import org.apache.hadoop.hdfs.DistributedFileSystem; 59 import org.apache.hadoop.hdfs.HdfsConfiguration; 60 import org.apache.hadoop.hdfs.MiniDFSCluster; 61 import org.apache.hadoop.hdfs.NameNodeProxies; 62 import org.apache.hadoop.hdfs.protocol.*; 63 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; 64 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; 65 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; 66 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; 67 import org.apache.hadoop.hdfs.server.datanode.DataNode; 68 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; 69 import org.apache.hadoop.io.IOUtils; 70 import org.apache.hadoop.test.GenericTestUtils; 71 import org.apache.hadoop.util.Time; 72 import org.apache.hadoop.util.Tool; 73 import org.apache.log4j.Level; 74 import org.junit.Test; 75 76 /** 77 * This class tests if a balancer schedules tasks correctly. 78 */ 79 public class TestBalancer { 80 private static final Log LOG = LogFactory.getLog(TestBalancer.class); 81 82 static { 83 ((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL); 84 } 85 86 final static long CAPACITY = 5000L; 87 final static String RACK0 = "/rack0"; 88 final static String RACK1 = "/rack1"; 89 final static String RACK2 = "/rack2"; 90 final private static String fileName = "/tmp.txt"; 91 final static Path filePath = new Path(fileName); 92 private MiniDFSCluster cluster; 93 94 ClientProtocol client; 95 96 static final long TIMEOUT = 40000L; //msec 97 static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5% 98 static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta 99 static final int DEFAULT_BLOCK_SIZE = 100; 100 static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024; 101 private static final Random r = new Random(); 102 103 static { initTestSetup()104 initTestSetup(); 105 } 106 initTestSetup()107 public static void initTestSetup() { 108 Dispatcher.setBlockMoveWaitTime(1000L) ; 109 110 // do not create id file since it occupies the disk space 111 NameNodeConnector.setWrite2IdFile(false); 112 } 113 initConf(Configuration conf)114 static void initConf(Configuration conf) { 115 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); 116 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); 117 conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); 118 conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); 119 SimulatedFSDataset.setFactory(conf); 120 conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); 121 } 122 initConfWithRamDisk(Configuration conf)123 static void initConfWithRamDisk(Configuration conf) { 124 conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE); 125 conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3); 126 conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); 127 conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); 128 conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1); 129 conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE); 130 } 131 132 /* create a file with a length of <code>fileLen</code> */ createFile(MiniDFSCluster cluster, Path filePath, long fileLen, short replicationFactor, int nnIndex)133 static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen, 134 short replicationFactor, int nnIndex) 135 throws IOException, InterruptedException, TimeoutException { 136 FileSystem fs = cluster.getFileSystem(nnIndex); 137 DFSTestUtil.createFile(fs, filePath, fileLen, 138 replicationFactor, r.nextLong()); 139 DFSTestUtil.waitReplication(fs, filePath, replicationFactor); 140 } 141 142 /* fill up a cluster with <code>numNodes</code> datanodes 143 * whose used space to be <code>size</code> 144 */ generateBlocks(Configuration conf, long size, short numNodes)145 private ExtendedBlock[] generateBlocks(Configuration conf, long size, 146 short numNodes) throws IOException, InterruptedException, TimeoutException { 147 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numNodes).build(); 148 try { 149 cluster.waitActive(); 150 client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), 151 ClientProtocol.class).getProxy(); 152 153 short replicationFactor = (short)(numNodes-1); 154 long fileLen = size/replicationFactor; 155 createFile(cluster , filePath, fileLen, replicationFactor, 0); 156 157 List<LocatedBlock> locatedBlocks = client. 158 getBlockLocations(fileName, 0, fileLen).getLocatedBlocks(); 159 160 int numOfBlocks = locatedBlocks.size(); 161 ExtendedBlock[] blocks = new ExtendedBlock[numOfBlocks]; 162 for(int i=0; i<numOfBlocks; i++) { 163 ExtendedBlock b = locatedBlocks.get(i).getBlock(); 164 blocks[i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b 165 .getNumBytes(), b.getGenerationStamp()); 166 } 167 168 return blocks; 169 } finally { 170 cluster.shutdown(); 171 } 172 } 173 174 /* Distribute all blocks according to the given distribution */ distributeBlocks(ExtendedBlock[] blocks, short replicationFactor, final long[] distribution)175 static Block[][] distributeBlocks(ExtendedBlock[] blocks, 176 short replicationFactor, final long[] distribution) { 177 // make a copy 178 long[] usedSpace = new long[distribution.length]; 179 System.arraycopy(distribution, 0, usedSpace, 0, distribution.length); 180 181 List<List<Block>> blockReports = 182 new ArrayList<List<Block>>(usedSpace.length); 183 Block[][] results = new Block[usedSpace.length][]; 184 for(int i=0; i<usedSpace.length; i++) { 185 blockReports.add(new ArrayList<Block>()); 186 } 187 for(int i=0; i<blocks.length; i++) { 188 for(int j=0; j<replicationFactor; j++) { 189 boolean notChosen = true; 190 while(notChosen) { 191 int chosenIndex = r.nextInt(usedSpace.length); 192 if( usedSpace[chosenIndex]>0 ) { 193 notChosen = false; 194 blockReports.get(chosenIndex).add(blocks[i].getLocalBlock()); 195 usedSpace[chosenIndex] -= blocks[i].getNumBytes(); 196 } 197 } 198 } 199 } 200 for(int i=0; i<usedSpace.length; i++) { 201 List<Block> nodeBlockList = blockReports.get(i); 202 results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]); 203 } 204 return results; 205 } 206 sum(long[] x)207 static long sum(long[] x) { 208 long s = 0L; 209 for(long a : x) { 210 s += a; 211 } 212 return s; 213 } 214 215 /* we first start a cluster and fill the cluster up to a certain size. 216 * then redistribute blocks according the required distribution. 217 * Afterwards a balancer is running to balance the cluster. 218 */ testUnevenDistribution(Configuration conf, long distribution[], long capacities[], String[] racks)219 private void testUnevenDistribution(Configuration conf, 220 long distribution[], long capacities[], String[] racks) throws Exception { 221 int numDatanodes = distribution.length; 222 if (capacities.length != numDatanodes || racks.length != numDatanodes) { 223 throw new IllegalArgumentException("Array length is not the same"); 224 } 225 226 // calculate total space that need to be filled 227 final long totalUsedSpace = sum(distribution); 228 229 // fill the cluster 230 ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace, 231 (short) numDatanodes); 232 233 // redistribute blocks 234 Block[][] blocksDN = distributeBlocks( 235 blocks, (short)(numDatanodes-1), distribution); 236 237 // restart the cluster: do NOT format the cluster 238 conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); 239 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes) 240 .format(false) 241 .racks(racks) 242 .simulatedCapacities(capacities) 243 .build(); 244 cluster.waitActive(); 245 client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), 246 ClientProtocol.class).getProxy(); 247 248 for(int i = 0; i < blocksDN.length; i++) 249 cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); 250 251 final long totalCapacity = sum(capacities); 252 runBalancer(conf, totalUsedSpace, totalCapacity); 253 cluster.shutdown(); 254 } 255 256 /** 257 * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE, 258 * summed over all nodes. Times out after TIMEOUT msec. 259 * @param expectedUsedSpace 260 * @param expectedTotalSpace 261 * @throws IOException - if getStats() fails 262 * @throws TimeoutException 263 */ waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace, ClientProtocol client, MiniDFSCluster cluster)264 static void waitForHeartBeat(long expectedUsedSpace, 265 long expectedTotalSpace, ClientProtocol client, MiniDFSCluster cluster) 266 throws IOException, TimeoutException { 267 long timeout = TIMEOUT; 268 long failtime = (timeout <= 0L) ? Long.MAX_VALUE 269 : Time.monotonicNow() + timeout; 270 271 while (true) { 272 long[] status = client.getStats(); 273 double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace) 274 / expectedTotalSpace; 275 double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace) 276 / expectedUsedSpace; 277 if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE 278 && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE) 279 break; //done 280 281 if (Time.monotonicNow() > failtime) { 282 throw new TimeoutException("Cluster failed to reached expected values of " 283 + "totalSpace (current: " + status[0] 284 + ", expected: " + expectedTotalSpace 285 + "), or usedSpace (current: " + status[1] 286 + ", expected: " + expectedUsedSpace 287 + "), in more than " + timeout + " msec."); 288 } 289 try { 290 Thread.sleep(100L); 291 } catch(InterruptedException ignored) { 292 } 293 } 294 } 295 296 /** 297 * Wait until balanced: each datanode gives utilization within 298 * BALANCE_ALLOWED_VARIANCE of average 299 * @throws IOException 300 * @throws TimeoutException 301 */ waitForBalancer(long totalUsedSpace, long totalCapacity, ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p)302 static void waitForBalancer(long totalUsedSpace, long totalCapacity, 303 ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p) 304 throws IOException, TimeoutException { 305 waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0); 306 } 307 308 /** 309 * Make sure that balancer can't move pinned blocks. 310 * If specified favoredNodes when create file, blocks will be pinned use 311 * sticky bit. 312 * @throws Exception 313 */ 314 @Test(timeout=100000) testBalancerWithPinnedBlocks()315 public void testBalancerWithPinnedBlocks() throws Exception { 316 // This test assumes stick-bit based block pin mechanism available only 317 // in Linux/Unix. It can be unblocked on Windows when HDFS-7759 is ready to 318 // provide a different mechanism for Windows. 319 assumeTrue(!Path.WINDOWS); 320 321 final Configuration conf = new HdfsConfiguration(); 322 initConf(conf); 323 conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true); 324 325 long[] capacities = new long[] { CAPACITY, CAPACITY }; 326 String[] racks = { RACK0, RACK1 }; 327 int numOfDatanodes = capacities.length; 328 329 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) 330 .hosts(new String[]{"localhost", "localhost"}) 331 .racks(racks).simulatedCapacities(capacities).build(); 332 333 try { 334 cluster.waitActive(); 335 client = NameNodeProxies.createProxy(conf, 336 cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); 337 338 // fill up the cluster to be 80% full 339 long totalCapacity = sum(capacities); 340 long totalUsedSpace = totalCapacity * 8 / 10; 341 InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes]; 342 for (int i = 0; i < favoredNodes.length; i++) { 343 favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress(); 344 } 345 346 DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, 347 totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE, 348 (short) numOfDatanodes, 0, false, favoredNodes); 349 350 // start up an empty node with the same capacity 351 cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 }, 352 new long[] { CAPACITY }); 353 354 totalCapacity += CAPACITY; 355 356 // run balancer and validate results 357 waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); 358 359 // start rebalancing 360 Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); 361 int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); 362 assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); 363 364 } finally { 365 cluster.shutdown(); 366 } 367 368 } 369 370 /** 371 * Wait until balanced: each datanode gives utilization within 372 * BALANCE_ALLOWED_VARIANCE of average 373 * @throws IOException 374 * @throws TimeoutException 375 */ waitForBalancer(long totalUsedSpace, long totalCapacity, ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, int expectedExcludedNodes)376 static void waitForBalancer(long totalUsedSpace, long totalCapacity, 377 ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, 378 int expectedExcludedNodes) throws IOException, TimeoutException { 379 long timeout = TIMEOUT; 380 long failtime = (timeout <= 0L) ? Long.MAX_VALUE 381 : Time.monotonicNow() + timeout; 382 if (!p.nodesToBeIncluded.isEmpty()) { 383 totalCapacity = p.nodesToBeIncluded.size() * CAPACITY; 384 } 385 if (!p.nodesToBeExcluded.isEmpty()) { 386 totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY; 387 } 388 final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; 389 boolean balanced; 390 do { 391 DatanodeInfo[] datanodeReport = 392 client.getDatanodeReport(DatanodeReportType.ALL); 393 assertEquals(datanodeReport.length, cluster.getDataNodes().size()); 394 balanced = true; 395 int actualExcludedNodeCount = 0; 396 for (DatanodeInfo datanode : datanodeReport) { 397 double nodeUtilization = ((double)datanode.getDfsUsed()) 398 / datanode.getCapacity(); 399 if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) { 400 assertTrue(nodeUtilization == 0); 401 actualExcludedNodeCount++; 402 continue; 403 } 404 if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) { 405 assertTrue(nodeUtilization == 0); 406 actualExcludedNodeCount++; 407 continue; 408 } 409 if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) { 410 balanced = false; 411 if (Time.monotonicNow() > failtime) { 412 throw new TimeoutException( 413 "Rebalancing expected avg utilization to become " 414 + avgUtilization + ", but on datanode " + datanode 415 + " it remains at " + nodeUtilization 416 + " after more than " + TIMEOUT + " msec."); 417 } 418 try { 419 Thread.sleep(100); 420 } catch (InterruptedException ignored) { 421 } 422 break; 423 } 424 } 425 assertEquals(expectedExcludedNodes,actualExcludedNodeCount); 426 } while (!balanced); 427 } 428 long2String(long[] array)429 String long2String(long[] array) { 430 if (array.length == 0) { 431 return "<empty>"; 432 } 433 StringBuilder b = new StringBuilder("[").append(array[0]); 434 for(int i = 1; i < array.length; i++) { 435 b.append(", ").append(array[i]); 436 } 437 return b.append("]").toString(); 438 } 439 /** 440 * Class which contains information about the 441 * new nodes to be added to the cluster for balancing. 442 */ 443 static abstract class NewNodeInfo { 444 445 Set<String> nodesToBeExcluded = new HashSet<String>(); 446 Set<String> nodesToBeIncluded = new HashSet<String>(); 447 getNames()448 abstract String[] getNames(); getNumberofNewNodes()449 abstract int getNumberofNewNodes(); getNumberofIncludeNodes()450 abstract int getNumberofIncludeNodes(); getNumberofExcludeNodes()451 abstract int getNumberofExcludeNodes(); 452 getNodesToBeIncluded()453 public Set<String> getNodesToBeIncluded() { 454 return nodesToBeIncluded; 455 } getNodesToBeExcluded()456 public Set<String> getNodesToBeExcluded() { 457 return nodesToBeExcluded; 458 } 459 } 460 461 /** 462 * The host names of new nodes are specified 463 */ 464 static class HostNameBasedNodes extends NewNodeInfo { 465 String[] hostnames; 466 HostNameBasedNodes(String[] hostnames, Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded)467 public HostNameBasedNodes(String[] hostnames, 468 Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) { 469 this.hostnames = hostnames; 470 this.nodesToBeExcluded = nodesToBeExcluded; 471 this.nodesToBeIncluded = nodesToBeIncluded; 472 } 473 474 @Override getNames()475 String[] getNames() { 476 return hostnames; 477 } 478 @Override getNumberofNewNodes()479 int getNumberofNewNodes() { 480 return hostnames.length; 481 } 482 @Override getNumberofIncludeNodes()483 int getNumberofIncludeNodes() { 484 return nodesToBeIncluded.size(); 485 } 486 @Override getNumberofExcludeNodes()487 int getNumberofExcludeNodes() { 488 return nodesToBeExcluded.size(); 489 } 490 } 491 492 /** 493 * The number of data nodes to be started are specified. 494 * The data nodes will have same host name, but different port numbers. 495 * 496 */ 497 static class PortNumberBasedNodes extends NewNodeInfo { 498 int newNodes; 499 int excludeNodes; 500 int includeNodes; 501 PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes)502 public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) { 503 this.newNodes = newNodes; 504 this.excludeNodes = excludeNodes; 505 this.includeNodes = includeNodes; 506 } 507 508 @Override getNames()509 String[] getNames() { 510 return null; 511 } 512 @Override getNumberofNewNodes()513 int getNumberofNewNodes() { 514 return newNodes; 515 } 516 @Override getNumberofIncludeNodes()517 int getNumberofIncludeNodes() { 518 return includeNodes; 519 } 520 @Override getNumberofExcludeNodes()521 int getNumberofExcludeNodes() { 522 return excludeNodes; 523 } 524 } 525 doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, boolean useTool)526 private void doTest(Configuration conf, long[] capacities, String[] racks, 527 long newCapacity, String newRack, boolean useTool) throws Exception { 528 doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false); 529 } 530 531 /** This test start a cluster with specified number of nodes, 532 * and fills it to be 30% full (with a single file replicated identically 533 * to all datanodes); 534 * It then adds one new empty node and starts balancing. 535 * 536 * @param conf - configuration 537 * @param capacities - array of capacities of original nodes in cluster 538 * @param racks - array of racks for original nodes in cluster 539 * @param newCapacity - new node's capacity 540 * @param newRack - new node's rack 541 * @param nodes - information about new nodes to be started. 542 * @param useTool - if true run test via Cli with command-line argument 543 * parsing, etc. Otherwise invoke balancer API directly. 544 * @param useFile - if true, the hosts to included or excluded will be stored in a 545 * file and then later read from the file. 546 * @throws Exception 547 */ doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile)548 private void doTest(Configuration conf, long[] capacities, 549 String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, 550 boolean useTool, boolean useFile) throws Exception { 551 LOG.info("capacities = " + long2String(capacities)); 552 LOG.info("racks = " + Arrays.asList(racks)); 553 LOG.info("newCapacity= " + newCapacity); 554 LOG.info("newRack = " + newRack); 555 LOG.info("useTool = " + useTool); 556 assertEquals(capacities.length, racks.length); 557 int numOfDatanodes = capacities.length; 558 cluster = new MiniDFSCluster.Builder(conf) 559 .numDataNodes(capacities.length) 560 .racks(racks) 561 .simulatedCapacities(capacities) 562 .build(); 563 try { 564 cluster.waitActive(); 565 client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), 566 ClientProtocol.class).getProxy(); 567 568 long totalCapacity = sum(capacities); 569 570 // fill up the cluster to be 30% full 571 long totalUsedSpace = totalCapacity*3/10; 572 createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, 573 (short) numOfDatanodes, 0); 574 575 if (nodes == null) { // there is no specification of new nodes. 576 // start up an empty node with the same capacity and on the same rack 577 cluster.startDataNodes(conf, 1, true, null, 578 new String[]{newRack}, null,new long[]{newCapacity}); 579 totalCapacity += newCapacity; 580 } else { 581 //if running a test with "include list", include original nodes as well 582 if (nodes.getNumberofIncludeNodes()>0) { 583 for (DataNode dn: cluster.getDataNodes()) 584 nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName()); 585 } 586 String[] newRacks = new String[nodes.getNumberofNewNodes()]; 587 long[] newCapacities = new long[nodes.getNumberofNewNodes()]; 588 for (int i=0; i < nodes.getNumberofNewNodes(); i++) { 589 newRacks[i] = newRack; 590 newCapacities[i] = newCapacity; 591 } 592 // if host names are specified for the new nodes to be created. 593 if (nodes.getNames() != null) { 594 cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null, 595 newRacks, nodes.getNames(), newCapacities); 596 totalCapacity += newCapacity*nodes.getNumberofNewNodes(); 597 } else { // host names are not specified 598 cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null, 599 newRacks, null, newCapacities); 600 totalCapacity += newCapacity*nodes.getNumberofNewNodes(); 601 //populate the include nodes 602 if (nodes.getNumberofIncludeNodes() > 0) { 603 int totalNodes = cluster.getDataNodes().size(); 604 for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) { 605 nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get( 606 totalNodes-1-i).getDatanodeId().getXferAddr()); 607 } 608 } 609 //polulate the exclude nodes 610 if (nodes.getNumberofExcludeNodes() > 0) { 611 int totalNodes = cluster.getDataNodes().size(); 612 for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) { 613 nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get( 614 totalNodes-1-i).getDatanodeId().getXferAddr()); 615 } 616 } 617 } 618 } 619 // run balancer and validate results 620 Balancer.Parameters p = Balancer.Parameters.DEFAULT; 621 if (nodes != null) { 622 p = new Balancer.Parameters( 623 Balancer.Parameters.DEFAULT.policy, 624 Balancer.Parameters.DEFAULT.threshold, 625 Balancer.Parameters.DEFAULT.maxIdleIteration, 626 nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded()); 627 } 628 629 int expectedExcludedNodes = 0; 630 if (nodes != null) { 631 if (!nodes.getNodesToBeExcluded().isEmpty()) { 632 expectedExcludedNodes = nodes.getNodesToBeExcluded().size(); 633 } else if (!nodes.getNodesToBeIncluded().isEmpty()) { 634 expectedExcludedNodes = 635 cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size(); 636 } 637 } 638 639 // run balancer and validate results 640 if (useTool) { 641 runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes); 642 } else { 643 runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes); 644 } 645 } finally { 646 cluster.shutdown(); 647 } 648 } 649 runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity)650 private void runBalancer(Configuration conf, 651 long totalUsedSpace, long totalCapacity) throws Exception { 652 runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0); 653 } 654 runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity, Balancer.Parameters p, int excludedNodes)655 private void runBalancer(Configuration conf, 656 long totalUsedSpace, long totalCapacity, Balancer.Parameters p, 657 int excludedNodes) throws Exception { 658 waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); 659 660 // start rebalancing 661 Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); 662 final int r = runBalancer(namenodes, p, conf); 663 if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 664 DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { 665 assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); 666 return; 667 } else { 668 assertEquals(ExitStatus.SUCCESS.getExitCode(), r); 669 } 670 waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); 671 LOG.info(" ."); 672 waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); 673 } 674 runBalancer(Collection<URI> namenodes, final Parameters p, Configuration conf)675 private static int runBalancer(Collection<URI> namenodes, final Parameters p, 676 Configuration conf) throws IOException, InterruptedException { 677 final long sleeptime = 678 conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 679 DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 + 680 conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 681 DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; 682 LOG.info("namenodes = " + namenodes); 683 LOG.info("parameters = " + p); 684 LOG.info("Print stack trace", new Throwable()); 685 686 System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); 687 688 List<NameNodeConnector> connectors = Collections.emptyList(); 689 try { 690 connectors = NameNodeConnector.newNameNodeConnectors(namenodes, 691 Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf, 692 Balancer.Parameters.DEFAULT.maxIdleIteration); 693 694 boolean done = false; 695 for(int iteration = 0; !done; iteration++) { 696 done = true; 697 Collections.shuffle(connectors); 698 for(NameNodeConnector nnc : connectors) { 699 final Balancer b = new Balancer(nnc, p, conf); 700 final Result r = b.runOneIteration(); 701 r.print(iteration, System.out); 702 703 // clean all lists 704 b.resetData(conf); 705 if (r.exitStatus == ExitStatus.IN_PROGRESS) { 706 done = false; 707 } else if (r.exitStatus != ExitStatus.SUCCESS) { 708 //must be an error statue, return. 709 return r.exitStatus.getExitCode(); 710 } else { 711 if (iteration > 0) { 712 assertTrue(r.bytesAlreadyMoved > 0); 713 } 714 } 715 } 716 717 if (!done) { 718 Thread.sleep(sleeptime); 719 } 720 } 721 } finally { 722 for(NameNodeConnector nnc : connectors) { 723 IOUtils.cleanup(LOG, nnc); 724 } 725 } 726 return ExitStatus.SUCCESS.getExitCode(); 727 } 728 runBalancerCli(Configuration conf, long totalUsedSpace, long totalCapacity, Balancer.Parameters p, boolean useFile, int expectedExcludedNodes)729 private void runBalancerCli(Configuration conf, 730 long totalUsedSpace, long totalCapacity, 731 Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception { 732 waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); 733 List <String> args = new ArrayList<String>(); 734 args.add("-policy"); 735 args.add("datanode"); 736 737 File excludeHostsFile = null; 738 if (!p.nodesToBeExcluded.isEmpty()) { 739 args.add("-exclude"); 740 if (useFile) { 741 excludeHostsFile = new File ("exclude-hosts-file"); 742 PrintWriter pw = new PrintWriter(excludeHostsFile); 743 for (String host: p.nodesToBeExcluded) { 744 pw.write( host + "\n"); 745 } 746 pw.close(); 747 args.add("-f"); 748 args.add("exclude-hosts-file"); 749 } else { 750 args.add(StringUtils.join(p.nodesToBeExcluded, ',')); 751 } 752 } 753 754 File includeHostsFile = null; 755 if (!p.nodesToBeIncluded.isEmpty()) { 756 args.add("-include"); 757 if (useFile) { 758 includeHostsFile = new File ("include-hosts-file"); 759 PrintWriter pw = new PrintWriter(includeHostsFile); 760 for (String host: p.nodesToBeIncluded){ 761 pw.write( host + "\n"); 762 } 763 pw.close(); 764 args.add("-f"); 765 args.add("include-hosts-file"); 766 } else { 767 args.add(StringUtils.join(p.nodesToBeIncluded, ',')); 768 } 769 } 770 771 final Tool tool = new Cli(); 772 tool.setConf(conf); 773 final int r = tool.run(args.toArray(new String[0])); // start rebalancing 774 775 assertEquals("Tools should exit 0 on success", 0, r); 776 waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); 777 LOG.info("Rebalancing with default ctor."); 778 waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes); 779 780 if (excludeHostsFile != null && excludeHostsFile.exists()) { 781 excludeHostsFile.delete(); 782 } 783 if (includeHostsFile != null && includeHostsFile.exists()) { 784 includeHostsFile.delete(); 785 } 786 } 787 788 /** one-node cluster test*/ oneNodeTest(Configuration conf, boolean useTool)789 private void oneNodeTest(Configuration conf, boolean useTool) throws Exception { 790 // add an empty node with half of the CAPACITY & the same rack 791 doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, 792 RACK0, useTool); 793 } 794 795 /** two-node cluster test */ twoNodeTest(Configuration conf)796 private void twoNodeTest(Configuration conf) throws Exception { 797 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 798 CAPACITY, RACK2, false); 799 } 800 801 /** test using a user-supplied conf */ integrationTest(Configuration conf)802 public void integrationTest(Configuration conf) throws Exception { 803 initConf(conf); 804 oneNodeTest(conf, false); 805 } 806 807 /* we first start a cluster and fill the cluster up to a certain size. 808 * then redistribute blocks according the required distribution. 809 * Then we start an empty datanode. 810 * Afterwards a balancer is run to balance the cluster. 811 * A partially filled datanode is excluded during balancing. 812 * This triggers a situation where one of the block's location is unknown. 813 */ 814 @Test(timeout=100000) testUnknownDatanode()815 public void testUnknownDatanode() throws Exception { 816 Configuration conf = new HdfsConfiguration(); 817 initConf(conf); 818 long distribution[] = new long[] {50*CAPACITY/100, 70*CAPACITY/100, 0*CAPACITY/100}; 819 long capacities[] = new long[]{CAPACITY, CAPACITY, CAPACITY}; 820 String racks[] = new String[] {RACK0, RACK1, RACK1}; 821 822 int numDatanodes = distribution.length; 823 if (capacities.length != numDatanodes || racks.length != numDatanodes) { 824 throw new IllegalArgumentException("Array length is not the same"); 825 } 826 827 // calculate total space that need to be filled 828 final long totalUsedSpace = sum(distribution); 829 830 // fill the cluster 831 ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace, 832 (short) numDatanodes); 833 834 // redistribute blocks 835 Block[][] blocksDN = distributeBlocks( 836 blocks, (short)(numDatanodes-1), distribution); 837 838 // restart the cluster: do NOT format the cluster 839 conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); 840 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes) 841 .format(false) 842 .racks(racks) 843 .simulatedCapacities(capacities) 844 .build(); 845 try { 846 cluster.waitActive(); 847 client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), 848 ClientProtocol.class).getProxy(); 849 850 for(int i = 0; i < 3; i++) { 851 cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); 852 } 853 854 cluster.startDataNodes(conf, 1, true, null, 855 new String[]{RACK0}, null,new long[]{CAPACITY}); 856 cluster.triggerHeartbeats(); 857 858 Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); 859 Set<String> datanodes = new HashSet<String>(); 860 datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); 861 Balancer.Parameters p = new Balancer.Parameters( 862 Balancer.Parameters.DEFAULT.policy, 863 Balancer.Parameters.DEFAULT.threshold, 864 Balancer.Parameters.DEFAULT.maxIdleIteration, 865 datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); 866 final int r = Balancer.run(namenodes, p, conf); 867 assertEquals(ExitStatus.SUCCESS.getExitCode(), r); 868 } finally { 869 cluster.shutdown(); 870 } 871 } 872 873 /** 874 * Test parse method in Balancer#Cli class with threshold value out of 875 * boundaries. 876 */ 877 @Test(timeout=100000) testBalancerCliParseWithThresholdOutOfBoundaries()878 public void testBalancerCliParseWithThresholdOutOfBoundaries() { 879 String parameters[] = new String[] { "-threshold", "0" }; 880 String reason = "IllegalArgumentException is expected when threshold value" 881 + " is out of boundary."; 882 try { 883 Balancer.Cli.parse(parameters); 884 fail(reason); 885 } catch (IllegalArgumentException e) { 886 assertEquals("Number out of range: threshold = 0.0", e.getMessage()); 887 } 888 parameters = new String[] { "-threshold", "101" }; 889 try { 890 Balancer.Cli.parse(parameters); 891 fail(reason); 892 } catch (IllegalArgumentException e) { 893 assertEquals("Number out of range: threshold = 101.0", e.getMessage()); 894 } 895 } 896 897 /** Test a cluster with even distribution, 898 * then a new empty node is added to the cluster*/ 899 @Test(timeout=100000) testBalancer0()900 public void testBalancer0() throws Exception { 901 testBalancer0Internal(new HdfsConfiguration()); 902 } 903 testBalancer0Internal(Configuration conf)904 void testBalancer0Internal(Configuration conf) throws Exception { 905 initConf(conf); 906 oneNodeTest(conf, false); 907 twoNodeTest(conf); 908 } 909 910 /** Test unevenly distributed cluster */ 911 @Test(timeout=100000) testBalancer1()912 public void testBalancer1() throws Exception { 913 testBalancer1Internal(new HdfsConfiguration()); 914 } 915 testBalancer1Internal(Configuration conf)916 void testBalancer1Internal(Configuration conf) throws Exception { 917 initConf(conf); 918 testUnevenDistribution(conf, 919 new long[] {50*CAPACITY/100, 10*CAPACITY/100}, 920 new long[]{CAPACITY, CAPACITY}, 921 new String[] {RACK0, RACK1}); 922 } 923 924 @Test(timeout=100000) testBalancerWithZeroThreadsForMove()925 public void testBalancerWithZeroThreadsForMove() throws Exception { 926 Configuration conf = new HdfsConfiguration(); 927 conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0); 928 testBalancer1Internal (conf); 929 } 930 931 @Test(timeout=100000) testBalancerWithNonZeroThreadsForMove()932 public void testBalancerWithNonZeroThreadsForMove() throws Exception { 933 Configuration conf = new HdfsConfiguration(); 934 conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8); 935 testBalancer1Internal (conf); 936 } 937 938 @Test(timeout=100000) testBalancer2()939 public void testBalancer2() throws Exception { 940 testBalancer2Internal(new HdfsConfiguration()); 941 } 942 testBalancer2Internal(Configuration conf)943 void testBalancer2Internal(Configuration conf) throws Exception { 944 initConf(conf); 945 testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY }, 946 new String[] { RACK0, RACK1 }, CAPACITY, RACK2); 947 } 948 testBalancerDefaultConstructor(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack)949 private void testBalancerDefaultConstructor(Configuration conf, 950 long[] capacities, String[] racks, long newCapacity, String newRack) 951 throws Exception { 952 int numOfDatanodes = capacities.length; 953 assertEquals(numOfDatanodes, racks.length); 954 cluster = new MiniDFSCluster.Builder(conf) 955 .numDataNodes(capacities.length) 956 .racks(racks) 957 .simulatedCapacities(capacities) 958 .build(); 959 try { 960 cluster.waitActive(); 961 client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), 962 ClientProtocol.class).getProxy(); 963 964 long totalCapacity = sum(capacities); 965 966 // fill up the cluster to be 30% full 967 long totalUsedSpace = totalCapacity * 3 / 10; 968 createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, 969 (short) numOfDatanodes, 0); 970 // start up an empty node with the same capacity and on the same rack 971 cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, 972 new long[] { newCapacity }); 973 974 totalCapacity += newCapacity; 975 976 // run balancer and validate results 977 runBalancer(conf, totalUsedSpace, totalCapacity); 978 } finally { 979 cluster.shutdown(); 980 } 981 } 982 983 /** 984 * Verify balancer exits 0 on success. 985 */ 986 @Test(timeout=100000) testExitZeroOnSuccess()987 public void testExitZeroOnSuccess() throws Exception { 988 final Configuration conf = new HdfsConfiguration(); 989 990 initConf(conf); 991 992 oneNodeTest(conf, true); 993 } 994 995 /** 996 * Test parse method in Balancer#Cli class with wrong number of params 997 */ 998 999 @Test testBalancerCliParseWithWrongParams()1000 public void testBalancerCliParseWithWrongParams() { 1001 String parameters[] = new String[] { "-threshold" }; 1002 String reason = 1003 "IllegalArgumentException is expected when value is not specified"; 1004 try { 1005 Balancer.Cli.parse(parameters); 1006 fail(reason); 1007 } catch (IllegalArgumentException e) { 1008 1009 } 1010 parameters = new String[] { "-policy" }; 1011 try { 1012 Balancer.Cli.parse(parameters); 1013 fail(reason); 1014 } catch (IllegalArgumentException e) { 1015 1016 } 1017 parameters = new String[] {"-threshold", "1", "-policy"}; 1018 try { 1019 Balancer.Cli.parse(parameters); 1020 fail(reason); 1021 } catch (IllegalArgumentException e) { 1022 1023 } 1024 parameters = new String[] {"-threshold", "1", "-include"}; 1025 try { 1026 Balancer.Cli.parse(parameters); 1027 fail(reason); 1028 } catch (IllegalArgumentException e) { 1029 1030 } 1031 parameters = new String[] {"-threshold", "1", "-exclude"}; 1032 try { 1033 Balancer.Cli.parse(parameters); 1034 fail(reason); 1035 } catch (IllegalArgumentException e) { 1036 1037 } 1038 parameters = new String[] {"-include", "-f"}; 1039 try { 1040 Balancer.Cli.parse(parameters); 1041 fail(reason); 1042 } catch (IllegalArgumentException e) { 1043 1044 } 1045 parameters = new String[] {"-exclude", "-f"}; 1046 try { 1047 Balancer.Cli.parse(parameters); 1048 fail(reason); 1049 } catch (IllegalArgumentException e) { 1050 1051 } 1052 1053 parameters = new String[] {"-include", "testnode1", "-exclude", "testnode2"}; 1054 try { 1055 Balancer.Cli.parse(parameters); 1056 fail("IllegalArgumentException is expected when both -exclude and -include are specified"); 1057 } catch (IllegalArgumentException e) { 1058 1059 } 1060 } 1061 1062 1063 /** 1064 * Test a cluster with even distribution, 1065 * then three nodes are added to the cluster, 1066 * runs balancer with two of the nodes in the exclude list 1067 */ 1068 @Test(timeout=100000) testBalancerWithExcludeList()1069 public void testBalancerWithExcludeList() throws Exception { 1070 final Configuration conf = new HdfsConfiguration(); 1071 initConf(conf); 1072 Set<String> excludeHosts = new HashSet<String>(); 1073 excludeHosts.add( "datanodeY"); 1074 excludeHosts.add( "datanodeZ"); 1075 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, 1076 new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, 1077 excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false); 1078 } 1079 1080 /** 1081 * Test a cluster with even distribution, 1082 * then three nodes are added to the cluster, 1083 * runs balancer with two of the nodes in the exclude list 1084 */ 1085 @Test(timeout=100000) testBalancerWithExcludeListWithPorts()1086 public void testBalancerWithExcludeListWithPorts() throws Exception { 1087 final Configuration conf = new HdfsConfiguration(); 1088 initConf(conf); 1089 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 1090 CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false); 1091 } 1092 1093 /** 1094 * Test a cluster with even distribution, 1095 * then three nodes are added to the cluster, 1096 * runs balancer with two of the nodes in the exclude list 1097 */ 1098 @Test(timeout=100000) testBalancerCliWithExcludeList()1099 public void testBalancerCliWithExcludeList() throws Exception { 1100 final Configuration conf = new HdfsConfiguration(); 1101 initConf(conf); 1102 Set<String> excludeHosts = new HashSet<String>(); 1103 excludeHosts.add( "datanodeY"); 1104 excludeHosts.add( "datanodeZ"); 1105 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, 1106 new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, 1107 Parameters.DEFAULT.nodesToBeIncluded), true, false); 1108 } 1109 1110 /** 1111 * Test a cluster with even distribution, 1112 * then three nodes are added to the cluster, 1113 * runs balancer with two of the nodes in the exclude list 1114 */ 1115 @Test(timeout=100000) testBalancerCliWithExcludeListWithPorts()1116 public void testBalancerCliWithExcludeListWithPorts() throws Exception { 1117 final Configuration conf = new HdfsConfiguration(); 1118 initConf(conf); 1119 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 1120 CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false); 1121 } 1122 1123 /** 1124 * Test a cluster with even distribution, 1125 * then three nodes are added to the cluster, 1126 * runs balancer with two of the nodes in the exclude list in a file 1127 */ 1128 @Test(timeout=100000) testBalancerCliWithExcludeListInAFile()1129 public void testBalancerCliWithExcludeListInAFile() throws Exception { 1130 final Configuration conf = new HdfsConfiguration(); 1131 initConf(conf); 1132 Set<String> excludeHosts = new HashSet<String>(); 1133 excludeHosts.add( "datanodeY"); 1134 excludeHosts.add( "datanodeZ"); 1135 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, 1136 new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, 1137 excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true); 1138 } 1139 1140 /** 1141 * Test a cluster with even distribution,G 1142 * then three nodes are added to the cluster, 1143 * runs balancer with two of the nodes in the exclude list 1144 */ 1145 @Test(timeout=100000) testBalancerCliWithExcludeListWithPortsInAFile()1146 public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception { 1147 final Configuration conf = new HdfsConfiguration(); 1148 initConf(conf); 1149 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 1150 CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true); 1151 } 1152 1153 /** 1154 * Test a cluster with even distribution, 1155 * then three nodes are added to the cluster, 1156 * runs balancer with two of the nodes in the include list 1157 */ 1158 @Test(timeout=100000) testBalancerWithIncludeList()1159 public void testBalancerWithIncludeList() throws Exception { 1160 final Configuration conf = new HdfsConfiguration(); 1161 initConf(conf); 1162 Set<String> includeHosts = new HashSet<String>(); 1163 includeHosts.add( "datanodeY"); 1164 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, 1165 new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, 1166 Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false); 1167 } 1168 1169 /** 1170 * Test a cluster with even distribution, 1171 * then three nodes are added to the cluster, 1172 * runs balancer with two of the nodes in the include list 1173 */ 1174 @Test(timeout=100000) testBalancerWithIncludeListWithPorts()1175 public void testBalancerWithIncludeListWithPorts() throws Exception { 1176 final Configuration conf = new HdfsConfiguration(); 1177 initConf(conf); 1178 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 1179 CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false); 1180 } 1181 1182 /** 1183 * Test a cluster with even distribution, 1184 * then three nodes are added to the cluster, 1185 * runs balancer with two of the nodes in the include list 1186 */ 1187 @Test(timeout=100000) testBalancerCliWithIncludeList()1188 public void testBalancerCliWithIncludeList() throws Exception { 1189 final Configuration conf = new HdfsConfiguration(); 1190 initConf(conf); 1191 Set<String> includeHosts = new HashSet<String>(); 1192 includeHosts.add( "datanodeY"); 1193 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, 1194 new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, 1195 Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false); 1196 } 1197 1198 /** 1199 * Test a cluster with even distribution, 1200 * then three nodes are added to the cluster, 1201 * runs balancer with two of the nodes in the include list 1202 */ 1203 @Test(timeout=100000) testBalancerCliWithIncludeListWithPorts()1204 public void testBalancerCliWithIncludeListWithPorts() throws Exception { 1205 final Configuration conf = new HdfsConfiguration(); 1206 initConf(conf); 1207 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 1208 CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false); 1209 } 1210 1211 /** 1212 * Test a cluster with even distribution, 1213 * then three nodes are added to the cluster, 1214 * runs balancer with two of the nodes in the include list 1215 */ 1216 @Test(timeout=100000) testBalancerCliWithIncludeListInAFile()1217 public void testBalancerCliWithIncludeListInAFile() throws Exception { 1218 final Configuration conf = new HdfsConfiguration(); 1219 initConf(conf); 1220 Set<String> includeHosts = new HashSet<String>(); 1221 includeHosts.add( "datanodeY"); 1222 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, 1223 new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, 1224 Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true); 1225 } 1226 1227 /** 1228 * Test a cluster with even distribution, 1229 * then three nodes are added to the cluster, 1230 * runs balancer with two of the nodes in the include list 1231 */ 1232 @Test(timeout=100000) testBalancerCliWithIncludeListWithPortsInAFile()1233 public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception { 1234 final Configuration conf = new HdfsConfiguration(); 1235 initConf(conf); 1236 doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, 1237 CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true); 1238 } 1239 1240 /* 1241 * Test Balancer with Ram_Disk configured 1242 * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK. 1243 * Then verify that the balancer does not migrate files on RAM_DISK across DN. 1244 */ 1245 @Test(timeout=300000) testBalancerWithRamDisk()1246 public void testBalancerWithRamDisk() throws Exception { 1247 final int SEED = 0xFADED; 1248 final short REPL_FACT = 1; 1249 Configuration conf = new Configuration(); 1250 initConfWithRamDisk(conf); 1251 1252 final int defaultRamDiskCapacity = 10; 1253 final long ramDiskStorageLimit = 1254 ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + 1255 (DEFAULT_RAM_DISK_BLOCK_SIZE - 1); 1256 final long diskStorageLimit = 1257 ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + 1258 (DEFAULT_RAM_DISK_BLOCK_SIZE - 1); 1259 1260 cluster = new MiniDFSCluster 1261 .Builder(conf) 1262 .numDataNodes(1) 1263 .storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit }) 1264 .storageTypes(new StorageType[] { RAM_DISK, DEFAULT }) 1265 .build(); 1266 1267 try { 1268 cluster.waitActive(); 1269 // Create few files on RAM_DISK 1270 final String METHOD_NAME = GenericTestUtils.getMethodName(); 1271 final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); 1272 final Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); 1273 1274 DistributedFileSystem fs = cluster.getFileSystem(); 1275 DFSClient client = fs.getClient(); 1276 DFSTestUtil.createFile(fs, path1, true, 1277 DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE, 1278 DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); 1279 DFSTestUtil.createFile(fs, path2, true, 1280 DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE, 1281 DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); 1282 1283 // Sleep for a short time to allow the lazy writer thread to do its job 1284 Thread.sleep(6 * 1000); 1285 1286 // Add another fresh DN with the same type/capacity without files on RAM_DISK 1287 StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}}; 1288 long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, diskStorageLimit}}; 1289 cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null, 1290 null, null, storageCapacities, null, false, false, false, null); 1291 1292 cluster.triggerHeartbeats(); 1293 Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); 1294 1295 // Run Balancer 1296 Balancer.Parameters p = new Balancer.Parameters( 1297 Parameters.DEFAULT.policy, 1298 Parameters.DEFAULT.threshold, 1299 Balancer.Parameters.DEFAULT.maxIdleIteration, 1300 Parameters.DEFAULT.nodesToBeExcluded, 1301 Parameters.DEFAULT.nodesToBeIncluded); 1302 final int r = Balancer.run(namenodes, p, conf); 1303 1304 // Validate no RAM_DISK block should be moved 1305 assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); 1306 1307 // Verify files are still on RAM_DISK 1308 DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK); 1309 DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK); 1310 } finally { 1311 cluster.shutdown(); 1312 } 1313 } 1314 1315 /** 1316 * Test special case. Two replicas belong to same block should not in same node. 1317 * We have 2 nodes. 1318 * We have a block in (DN0,SSD) and (DN1,DISK). 1319 * Replica in (DN0,SSD) should not be moved to (DN1,SSD). 1320 * Otherwise DN1 has 2 replicas. 1321 */ 1322 @Test(timeout=100000) testTwoReplicaShouldNotInSameDN()1323 public void testTwoReplicaShouldNotInSameDN() throws Exception { 1324 final Configuration conf = new HdfsConfiguration(); 1325 1326 int blockSize = 5 * 1024 * 1024 ; 1327 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); 1328 conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); 1329 conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); 1330 1331 int numOfDatanodes =2; 1332 final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 1333 .numDataNodes(2) 1334 .racks(new String[]{"/default/rack0", "/default/rack0"}) 1335 .storagesPerDatanode(2) 1336 .storageTypes(new StorageType[][]{ 1337 {StorageType.SSD, StorageType.DISK}, 1338 {StorageType.SSD, StorageType.DISK}}) 1339 .storageCapacities(new long[][]{ 1340 {100 * blockSize, 20 * blockSize}, 1341 {20 * blockSize, 100 * blockSize}}) 1342 .build(); 1343 1344 try { 1345 cluster.waitActive(); 1346 1347 //set "/bar" directory with ONE_SSD storage policy. 1348 DistributedFileSystem fs = cluster.getFileSystem(); 1349 Path barDir = new Path("/bar"); 1350 fs.mkdir(barDir,new FsPermission((short)777)); 1351 fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); 1352 1353 // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full, 1354 // and (DN0,SSD) and (DN1,DISK) are about 15% full. 1355 long fileLen = 30 * blockSize; 1356 // fooFile has ONE_SSD policy. So 1357 // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block. 1358 // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block. 1359 Path fooFile = new Path(barDir, "foo"); 1360 createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0); 1361 // update space info 1362 cluster.triggerHeartbeats(); 1363 1364 Balancer.Parameters p = Balancer.Parameters.DEFAULT; 1365 Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); 1366 final int r = Balancer.run(namenodes, p, conf); 1367 1368 // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK) 1369 // already has one. Otherwise DN1 will have 2 replicas. 1370 // For same reason, no replicas were moved. 1371 assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); 1372 1373 } finally { 1374 cluster.shutdown(); 1375 } 1376 } 1377 1378 /** 1379 * Test running many balancer simultaneously. 1380 * 1381 * Case-1: First balancer is running. Now, running second one should get 1382 * "Another balancer is running. Exiting.." IOException and fail immediately 1383 * 1384 * Case-2: When running second balancer 'balancer.id' file exists but the 1385 * lease doesn't exists. Now, the second balancer should run successfully. 1386 */ 1387 @Test(timeout = 100000) testManyBalancerSimultaneously()1388 public void testManyBalancerSimultaneously() throws Exception { 1389 final Configuration conf = new HdfsConfiguration(); 1390 initConf(conf); 1391 // add an empty node with half of the capacities(4 * CAPACITY) & the same 1392 // rack 1393 long[] capacities = new long[] { 4 * CAPACITY }; 1394 String[] racks = new String[] { RACK0 }; 1395 long newCapacity = 2 * CAPACITY; 1396 String newRack = RACK0; 1397 LOG.info("capacities = " + long2String(capacities)); 1398 LOG.info("racks = " + Arrays.asList(racks)); 1399 LOG.info("newCapacity= " + newCapacity); 1400 LOG.info("newRack = " + newRack); 1401 LOG.info("useTool = " + false); 1402 assertEquals(capacities.length, racks.length); 1403 int numOfDatanodes = capacities.length; 1404 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) 1405 .racks(racks).simulatedCapacities(capacities).build(); 1406 try { 1407 cluster.waitActive(); 1408 client = NameNodeProxies.createProxy(conf, 1409 cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); 1410 1411 long totalCapacity = sum(capacities); 1412 1413 // fill up the cluster to be 30% full 1414 final long totalUsedSpace = totalCapacity * 3 / 10; 1415 createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, 1416 (short) numOfDatanodes, 0); 1417 // start up an empty node with the same capacity and on the same rack 1418 cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, 1419 new long[] { newCapacity }); 1420 1421 // Case1: Simulate first balancer by creating 'balancer.id' file. It 1422 // will keep this file until the balancing operation is completed. 1423 FileSystem fs = cluster.getFileSystem(0); 1424 final FSDataOutputStream out = fs 1425 .create(Balancer.BALANCER_ID_PATH, false); 1426 out.writeBytes(InetAddress.getLocalHost().getHostName()); 1427 out.hflush(); 1428 assertTrue("'balancer.id' file doesn't exist!", 1429 fs.exists(Balancer.BALANCER_ID_PATH)); 1430 1431 // start second balancer 1432 final String[] args = { "-policy", "datanode" }; 1433 final Tool tool = new Cli(); 1434 tool.setConf(conf); 1435 int exitCode = tool.run(args); // start balancing 1436 assertEquals("Exit status code mismatches", 1437 ExitStatus.IO_EXCEPTION.getExitCode(), exitCode); 1438 1439 // Case2: Release lease so that another balancer would be able to 1440 // perform balancing. 1441 out.close(); 1442 assertTrue("'balancer.id' file doesn't exist!", 1443 fs.exists(Balancer.BALANCER_ID_PATH)); 1444 exitCode = tool.run(args); // start balancing 1445 assertEquals("Exit status code mismatches", 1446 ExitStatus.SUCCESS.getExitCode(), exitCode); 1447 } finally { 1448 cluster.shutdown(); 1449 } 1450 } 1451 1452 /** 1453 * @param args 1454 */ main(String[] args)1455 public static void main(String[] args) throws Exception { 1456 TestBalancer balancerTest = new TestBalancer(); 1457 balancerTest.testBalancer0(); 1458 balancerTest.testBalancer1(); 1459 balancerTest.testBalancer2(); 1460 } 1461 } 1462