1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.balancer;
19 
20 import static org.apache.hadoop.fs.StorageType.DEFAULT;
21 import static org.apache.hadoop.fs.StorageType.RAM_DISK;
22 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26 import static org.junit.Assume.assumeTrue;
27 
28 import java.io.File;
29 import java.io.IOException;
30 import java.io.PrintWriter;
31 import java.net.InetAddress;
32 import java.net.URI;
33 import java.net.InetSocketAddress;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collection;
37 import java.util.Collections;
38 import java.util.HashSet;
39 import java.util.List;
40 import java.util.Random;
41 import java.util.Set;
42 import java.util.concurrent.TimeoutException;
43 
44 import org.apache.commons.lang.StringUtils;
45 import org.apache.commons.logging.Log;
46 import org.apache.commons.logging.LogFactory;
47 import org.apache.commons.logging.impl.Log4JLogger;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FSDataOutputStream;
50 import org.apache.hadoop.fs.FileSystem;
51 import org.apache.hadoop.fs.Path;
52 import org.apache.hadoop.fs.StorageType;
53 import org.apache.hadoop.fs.permission.FsPermission;
54 import org.apache.hadoop.hdfs.DFSClient;
55 import org.apache.hadoop.hdfs.DFSConfigKeys;
56 import org.apache.hadoop.hdfs.DFSTestUtil;
57 import org.apache.hadoop.hdfs.DFSUtil;
58 import org.apache.hadoop.hdfs.DistributedFileSystem;
59 import org.apache.hadoop.hdfs.HdfsConfiguration;
60 import org.apache.hadoop.hdfs.MiniDFSCluster;
61 import org.apache.hadoop.hdfs.NameNodeProxies;
62 import org.apache.hadoop.hdfs.protocol.*;
63 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
64 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
65 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
66 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
67 import org.apache.hadoop.hdfs.server.datanode.DataNode;
68 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
69 import org.apache.hadoop.io.IOUtils;
70 import org.apache.hadoop.test.GenericTestUtils;
71 import org.apache.hadoop.util.Time;
72 import org.apache.hadoop.util.Tool;
73 import org.apache.log4j.Level;
74 import org.junit.Test;
75 
76 /**
77  * This class tests if a balancer schedules tasks correctly.
78  */
79 public class TestBalancer {
80   private static final Log LOG = LogFactory.getLog(TestBalancer.class);
81 
82   static {
83     ((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL);
84   }
85 
86   final static long CAPACITY = 5000L;
87   final static String RACK0 = "/rack0";
88   final static String RACK1 = "/rack1";
89   final static String RACK2 = "/rack2";
90   final private static String fileName = "/tmp.txt";
91   final static Path filePath = new Path(fileName);
92   private MiniDFSCluster cluster;
93 
94   ClientProtocol client;
95 
96   static final long TIMEOUT = 40000L; //msec
97   static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
98   static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
99   static final int DEFAULT_BLOCK_SIZE = 100;
100   static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
101   private static final Random r = new Random();
102 
103   static {
initTestSetup()104     initTestSetup();
105   }
106 
initTestSetup()107   public static void initTestSetup() {
108     Dispatcher.setBlockMoveWaitTime(1000L) ;
109 
110     // do not create id file since it occupies the disk space
111     NameNodeConnector.setWrite2IdFile(false);
112   }
113 
initConf(Configuration conf)114   static void initConf(Configuration conf) {
115     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
116     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
117     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
118     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
119     SimulatedFSDataset.setFactory(conf);
120     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
121   }
122 
initConfWithRamDisk(Configuration conf)123   static void initConfWithRamDisk(Configuration conf) {
124     conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
125     conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
126     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
127     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
128     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
129     conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE);
130   }
131 
132   /* create a file with a length of <code>fileLen</code> */
createFile(MiniDFSCluster cluster, Path filePath, long fileLen, short replicationFactor, int nnIndex)133   static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
134       short replicationFactor, int nnIndex)
135   throws IOException, InterruptedException, TimeoutException {
136     FileSystem fs = cluster.getFileSystem(nnIndex);
137     DFSTestUtil.createFile(fs, filePath, fileLen,
138         replicationFactor, r.nextLong());
139     DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
140   }
141 
142   /* fill up a cluster with <code>numNodes</code> datanodes
143    * whose used space to be <code>size</code>
144    */
generateBlocks(Configuration conf, long size, short numNodes)145   private ExtendedBlock[] generateBlocks(Configuration conf, long size,
146       short numNodes) throws IOException, InterruptedException, TimeoutException {
147     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numNodes).build();
148     try {
149       cluster.waitActive();
150       client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
151           ClientProtocol.class).getProxy();
152 
153       short replicationFactor = (short)(numNodes-1);
154       long fileLen = size/replicationFactor;
155       createFile(cluster , filePath, fileLen, replicationFactor, 0);
156 
157       List<LocatedBlock> locatedBlocks = client.
158       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
159 
160       int numOfBlocks = locatedBlocks.size();
161       ExtendedBlock[] blocks = new ExtendedBlock[numOfBlocks];
162       for(int i=0; i<numOfBlocks; i++) {
163         ExtendedBlock b = locatedBlocks.get(i).getBlock();
164         blocks[i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b
165             .getNumBytes(), b.getGenerationStamp());
166       }
167 
168       return blocks;
169     } finally {
170       cluster.shutdown();
171     }
172   }
173 
174   /* Distribute all blocks according to the given distribution */
distributeBlocks(ExtendedBlock[] blocks, short replicationFactor, final long[] distribution)175   static Block[][] distributeBlocks(ExtendedBlock[] blocks,
176       short replicationFactor, final long[] distribution) {
177     // make a copy
178     long[] usedSpace = new long[distribution.length];
179     System.arraycopy(distribution, 0, usedSpace, 0, distribution.length);
180 
181     List<List<Block>> blockReports =
182       new ArrayList<List<Block>>(usedSpace.length);
183     Block[][] results = new Block[usedSpace.length][];
184     for(int i=0; i<usedSpace.length; i++) {
185       blockReports.add(new ArrayList<Block>());
186     }
187     for(int i=0; i<blocks.length; i++) {
188       for(int j=0; j<replicationFactor; j++) {
189         boolean notChosen = true;
190         while(notChosen) {
191           int chosenIndex = r.nextInt(usedSpace.length);
192           if( usedSpace[chosenIndex]>0 ) {
193             notChosen = false;
194             blockReports.get(chosenIndex).add(blocks[i].getLocalBlock());
195             usedSpace[chosenIndex] -= blocks[i].getNumBytes();
196           }
197         }
198       }
199     }
200     for(int i=0; i<usedSpace.length; i++) {
201       List<Block> nodeBlockList = blockReports.get(i);
202       results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]);
203     }
204     return results;
205   }
206 
sum(long[] x)207   static long sum(long[] x) {
208     long s = 0L;
209     for(long a : x) {
210       s += a;
211     }
212     return s;
213   }
214 
215   /* we first start a cluster and fill the cluster up to a certain size.
216    * then redistribute blocks according the required distribution.
217    * Afterwards a balancer is running to balance the cluster.
218    */
testUnevenDistribution(Configuration conf, long distribution[], long capacities[], String[] racks)219   private void testUnevenDistribution(Configuration conf,
220       long distribution[], long capacities[], String[] racks) throws Exception {
221     int numDatanodes = distribution.length;
222     if (capacities.length != numDatanodes || racks.length != numDatanodes) {
223       throw new IllegalArgumentException("Array length is not the same");
224     }
225 
226     // calculate total space that need to be filled
227     final long totalUsedSpace = sum(distribution);
228 
229     // fill the cluster
230     ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace,
231         (short) numDatanodes);
232 
233     // redistribute blocks
234     Block[][] blocksDN = distributeBlocks(
235         blocks, (short)(numDatanodes-1), distribution);
236 
237     // restart the cluster: do NOT format the cluster
238     conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
239     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
240                                               .format(false)
241                                               .racks(racks)
242                                               .simulatedCapacities(capacities)
243                                               .build();
244     cluster.waitActive();
245     client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
246         ClientProtocol.class).getProxy();
247 
248     for(int i = 0; i < blocksDN.length; i++)
249       cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
250 
251     final long totalCapacity = sum(capacities);
252     runBalancer(conf, totalUsedSpace, totalCapacity);
253     cluster.shutdown();
254   }
255 
256   /**
257    * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE,
258    * summed over all nodes.  Times out after TIMEOUT msec.
259    * @param expectedUsedSpace
260    * @param expectedTotalSpace
261    * @throws IOException - if getStats() fails
262    * @throws TimeoutException
263    */
waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace, ClientProtocol client, MiniDFSCluster cluster)264   static void waitForHeartBeat(long expectedUsedSpace,
265       long expectedTotalSpace, ClientProtocol client, MiniDFSCluster cluster)
266   throws IOException, TimeoutException {
267     long timeout = TIMEOUT;
268     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
269              : Time.monotonicNow() + timeout;
270 
271     while (true) {
272       long[] status = client.getStats();
273       double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace)
274           / expectedTotalSpace;
275       double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace)
276           / expectedUsedSpace;
277       if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE
278           && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
279         break; //done
280 
281       if (Time.monotonicNow() > failtime) {
282         throw new TimeoutException("Cluster failed to reached expected values of "
283             + "totalSpace (current: " + status[0]
284             + ", expected: " + expectedTotalSpace
285             + "), or usedSpace (current: " + status[1]
286             + ", expected: " + expectedUsedSpace
287             + "), in more than " + timeout + " msec.");
288       }
289       try {
290         Thread.sleep(100L);
291       } catch(InterruptedException ignored) {
292       }
293     }
294   }
295 
296   /**
297    * Wait until balanced: each datanode gives utilization within
298    * BALANCE_ALLOWED_VARIANCE of average
299    * @throws IOException
300    * @throws TimeoutException
301    */
waitForBalancer(long totalUsedSpace, long totalCapacity, ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p)302   static void waitForBalancer(long totalUsedSpace, long totalCapacity,
303       ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p)
304   throws IOException, TimeoutException {
305     waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
306   }
307 
308   /**
309    * Make sure that balancer can't move pinned blocks.
310    * If specified favoredNodes when create file, blocks will be pinned use
311    * sticky bit.
312    * @throws Exception
313    */
314   @Test(timeout=100000)
testBalancerWithPinnedBlocks()315   public void testBalancerWithPinnedBlocks() throws Exception {
316     // This test assumes stick-bit based block pin mechanism available only
317     // in Linux/Unix. It can be unblocked on Windows when HDFS-7759 is ready to
318     // provide a different mechanism for Windows.
319     assumeTrue(!Path.WINDOWS);
320 
321     final Configuration conf = new HdfsConfiguration();
322     initConf(conf);
323     conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
324 
325     long[] capacities =  new long[] { CAPACITY, CAPACITY };
326     String[] racks = { RACK0, RACK1 };
327     int numOfDatanodes = capacities.length;
328 
329     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
330       .hosts(new String[]{"localhost", "localhost"})
331       .racks(racks).simulatedCapacities(capacities).build();
332 
333     try {
334       cluster.waitActive();
335       client = NameNodeProxies.createProxy(conf,
336           cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
337 
338       // fill up the cluster to be 80% full
339       long totalCapacity = sum(capacities);
340       long totalUsedSpace = totalCapacity * 8 / 10;
341       InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
342       for (int i = 0; i < favoredNodes.length; i++) {
343         favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
344       }
345 
346       DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
347           totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
348           (short) numOfDatanodes, 0, false, favoredNodes);
349 
350       // start up an empty node with the same capacity
351       cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
352           new long[] { CAPACITY });
353 
354       totalCapacity += CAPACITY;
355 
356       // run balancer and validate results
357       waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
358 
359       // start rebalancing
360       Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
361       int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
362       assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
363 
364     } finally {
365       cluster.shutdown();
366     }
367 
368   }
369 
370   /**
371    * Wait until balanced: each datanode gives utilization within
372    * BALANCE_ALLOWED_VARIANCE of average
373    * @throws IOException
374    * @throws TimeoutException
375    */
waitForBalancer(long totalUsedSpace, long totalCapacity, ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, int expectedExcludedNodes)376   static void waitForBalancer(long totalUsedSpace, long totalCapacity,
377       ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p,
378       int expectedExcludedNodes) throws IOException, TimeoutException {
379     long timeout = TIMEOUT;
380     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
381         : Time.monotonicNow() + timeout;
382     if (!p.nodesToBeIncluded.isEmpty()) {
383       totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
384     }
385     if (!p.nodesToBeExcluded.isEmpty()) {
386         totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY;
387     }
388     final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
389     boolean balanced;
390     do {
391       DatanodeInfo[] datanodeReport =
392           client.getDatanodeReport(DatanodeReportType.ALL);
393       assertEquals(datanodeReport.length, cluster.getDataNodes().size());
394       balanced = true;
395       int actualExcludedNodeCount = 0;
396       for (DatanodeInfo datanode : datanodeReport) {
397         double nodeUtilization = ((double)datanode.getDfsUsed())
398             / datanode.getCapacity();
399         if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) {
400           assertTrue(nodeUtilization == 0);
401           actualExcludedNodeCount++;
402           continue;
403         }
404         if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) {
405           assertTrue(nodeUtilization == 0);
406           actualExcludedNodeCount++;
407           continue;
408         }
409         if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
410           balanced = false;
411           if (Time.monotonicNow() > failtime) {
412             throw new TimeoutException(
413                 "Rebalancing expected avg utilization to become "
414                 + avgUtilization + ", but on datanode " + datanode
415                 + " it remains at " + nodeUtilization
416                 + " after more than " + TIMEOUT + " msec.");
417           }
418           try {
419             Thread.sleep(100);
420           } catch (InterruptedException ignored) {
421           }
422           break;
423         }
424       }
425       assertEquals(expectedExcludedNodes,actualExcludedNodeCount);
426     } while (!balanced);
427   }
428 
long2String(long[] array)429   String long2String(long[] array) {
430     if (array.length == 0) {
431       return "<empty>";
432     }
433     StringBuilder b = new StringBuilder("[").append(array[0]);
434     for(int i = 1; i < array.length; i++) {
435       b.append(", ").append(array[i]);
436     }
437     return b.append("]").toString();
438   }
439   /**
440    * Class which contains information about the
441    * new nodes to be added to the cluster for balancing.
442    */
443   static abstract class NewNodeInfo {
444 
445     Set<String> nodesToBeExcluded = new HashSet<String>();
446     Set<String> nodesToBeIncluded = new HashSet<String>();
447 
getNames()448      abstract String[] getNames();
getNumberofNewNodes()449      abstract int getNumberofNewNodes();
getNumberofIncludeNodes()450      abstract int getNumberofIncludeNodes();
getNumberofExcludeNodes()451      abstract int getNumberofExcludeNodes();
452 
getNodesToBeIncluded()453      public Set<String> getNodesToBeIncluded() {
454        return nodesToBeIncluded;
455      }
getNodesToBeExcluded()456      public Set<String> getNodesToBeExcluded() {
457        return nodesToBeExcluded;
458      }
459   }
460 
461   /**
462    * The host names of new nodes are specified
463    */
464   static class HostNameBasedNodes extends NewNodeInfo {
465     String[] hostnames;
466 
HostNameBasedNodes(String[] hostnames, Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded)467     public HostNameBasedNodes(String[] hostnames,
468         Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
469       this.hostnames = hostnames;
470       this.nodesToBeExcluded = nodesToBeExcluded;
471       this.nodesToBeIncluded = nodesToBeIncluded;
472     }
473 
474     @Override
getNames()475     String[] getNames() {
476       return hostnames;
477     }
478     @Override
getNumberofNewNodes()479     int getNumberofNewNodes() {
480       return hostnames.length;
481     }
482     @Override
getNumberofIncludeNodes()483     int getNumberofIncludeNodes() {
484       return nodesToBeIncluded.size();
485     }
486     @Override
getNumberofExcludeNodes()487     int getNumberofExcludeNodes() {
488       return nodesToBeExcluded.size();
489     }
490   }
491 
492   /**
493    * The number of data nodes to be started are specified.
494    * The data nodes will have same host name, but different port numbers.
495    *
496    */
497   static class PortNumberBasedNodes extends NewNodeInfo {
498     int newNodes;
499     int excludeNodes;
500     int includeNodes;
501 
PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes)502     public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) {
503       this.newNodes = newNodes;
504       this.excludeNodes = excludeNodes;
505       this.includeNodes = includeNodes;
506     }
507 
508     @Override
getNames()509     String[] getNames() {
510       return null;
511     }
512     @Override
getNumberofNewNodes()513     int getNumberofNewNodes() {
514       return newNodes;
515     }
516     @Override
getNumberofIncludeNodes()517     int getNumberofIncludeNodes() {
518       return includeNodes;
519     }
520     @Override
getNumberofExcludeNodes()521     int getNumberofExcludeNodes() {
522       return excludeNodes;
523     }
524   }
525 
doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, boolean useTool)526   private void doTest(Configuration conf, long[] capacities, String[] racks,
527       long newCapacity, String newRack, boolean useTool) throws Exception {
528     doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false);
529   }
530 
531   /** This test start a cluster with specified number of nodes,
532    * and fills it to be 30% full (with a single file replicated identically
533    * to all datanodes);
534    * It then adds one new empty node and starts balancing.
535    *
536    * @param conf - configuration
537    * @param capacities - array of capacities of original nodes in cluster
538    * @param racks - array of racks for original nodes in cluster
539    * @param newCapacity - new node's capacity
540    * @param newRack - new node's rack
541    * @param nodes - information about new nodes to be started.
542    * @param useTool - if true run test via Cli with command-line argument
543    *   parsing, etc.   Otherwise invoke balancer API directly.
544    * @param useFile - if true, the hosts to included or excluded will be stored in a
545    *   file and then later read from the file.
546    * @throws Exception
547    */
doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile)548   private void doTest(Configuration conf, long[] capacities,
549       String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
550       boolean useTool, boolean useFile) throws Exception {
551     LOG.info("capacities = " +  long2String(capacities));
552     LOG.info("racks      = " +  Arrays.asList(racks));
553     LOG.info("newCapacity= " +  newCapacity);
554     LOG.info("newRack    = " +  newRack);
555     LOG.info("useTool    = " +  useTool);
556     assertEquals(capacities.length, racks.length);
557     int numOfDatanodes = capacities.length;
558     cluster = new MiniDFSCluster.Builder(conf)
559                                 .numDataNodes(capacities.length)
560                                 .racks(racks)
561                                 .simulatedCapacities(capacities)
562                                 .build();
563     try {
564       cluster.waitActive();
565       client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
566           ClientProtocol.class).getProxy();
567 
568       long totalCapacity = sum(capacities);
569 
570       // fill up the cluster to be 30% full
571       long totalUsedSpace = totalCapacity*3/10;
572       createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
573           (short) numOfDatanodes, 0);
574 
575       if (nodes == null) { // there is no specification of new nodes.
576         // start up an empty node with the same capacity and on the same rack
577         cluster.startDataNodes(conf, 1, true, null,
578             new String[]{newRack}, null,new long[]{newCapacity});
579         totalCapacity += newCapacity;
580       } else {
581         //if running a test with "include list", include original nodes as well
582         if (nodes.getNumberofIncludeNodes()>0) {
583           for (DataNode dn: cluster.getDataNodes())
584             nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName());
585         }
586         String[] newRacks = new String[nodes.getNumberofNewNodes()];
587         long[] newCapacities = new long[nodes.getNumberofNewNodes()];
588         for (int i=0; i < nodes.getNumberofNewNodes(); i++) {
589           newRacks[i] = newRack;
590           newCapacities[i] = newCapacity;
591         }
592         // if host names are specified for the new nodes to be created.
593         if (nodes.getNames() != null) {
594           cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
595               newRacks, nodes.getNames(), newCapacities);
596           totalCapacity += newCapacity*nodes.getNumberofNewNodes();
597         } else {  // host names are not specified
598           cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
599               newRacks, null, newCapacities);
600           totalCapacity += newCapacity*nodes.getNumberofNewNodes();
601           //populate the include nodes
602           if (nodes.getNumberofIncludeNodes() > 0) {
603             int totalNodes = cluster.getDataNodes().size();
604             for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) {
605               nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get(
606                   totalNodes-1-i).getDatanodeId().getXferAddr());
607             }
608           }
609           //polulate the exclude nodes
610           if (nodes.getNumberofExcludeNodes() > 0) {
611             int totalNodes = cluster.getDataNodes().size();
612             for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) {
613               nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get(
614                   totalNodes-1-i).getDatanodeId().getXferAddr());
615             }
616           }
617         }
618       }
619       // run balancer and validate results
620       Balancer.Parameters p = Balancer.Parameters.DEFAULT;
621       if (nodes != null) {
622         p = new Balancer.Parameters(
623             Balancer.Parameters.DEFAULT.policy,
624             Balancer.Parameters.DEFAULT.threshold,
625             Balancer.Parameters.DEFAULT.maxIdleIteration,
626             nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
627       }
628 
629       int expectedExcludedNodes = 0;
630       if (nodes != null) {
631         if (!nodes.getNodesToBeExcluded().isEmpty()) {
632           expectedExcludedNodes = nodes.getNodesToBeExcluded().size();
633         } else if (!nodes.getNodesToBeIncluded().isEmpty()) {
634           expectedExcludedNodes =
635               cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size();
636         }
637       }
638 
639       // run balancer and validate results
640       if (useTool) {
641         runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes);
642       } else {
643         runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
644       }
645     } finally {
646       cluster.shutdown();
647     }
648   }
649 
runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity)650   private void runBalancer(Configuration conf,
651       long totalUsedSpace, long totalCapacity) throws Exception {
652     runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0);
653   }
654 
runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity, Balancer.Parameters p, int excludedNodes)655   private void runBalancer(Configuration conf,
656      long totalUsedSpace, long totalCapacity, Balancer.Parameters p,
657      int excludedNodes) throws Exception {
658     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
659 
660     // start rebalancing
661     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
662     final int r = runBalancer(namenodes, p, conf);
663     if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
664         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
665       assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
666       return;
667     } else {
668       assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
669     }
670     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
671     LOG.info("  .");
672     waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
673   }
674 
runBalancer(Collection<URI> namenodes, final Parameters p, Configuration conf)675   private static int runBalancer(Collection<URI> namenodes, final Parameters p,
676       Configuration conf) throws IOException, InterruptedException {
677     final long sleeptime =
678         conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
679             DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
680         conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
681             DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
682     LOG.info("namenodes  = " + namenodes);
683     LOG.info("parameters = " + p);
684     LOG.info("Print stack trace", new Throwable());
685 
686     System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
687 
688     List<NameNodeConnector> connectors = Collections.emptyList();
689     try {
690       connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
691           Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf,
692           Balancer.Parameters.DEFAULT.maxIdleIteration);
693 
694       boolean done = false;
695       for(int iteration = 0; !done; iteration++) {
696         done = true;
697         Collections.shuffle(connectors);
698         for(NameNodeConnector nnc : connectors) {
699           final Balancer b = new Balancer(nnc, p, conf);
700           final Result r = b.runOneIteration();
701           r.print(iteration, System.out);
702 
703           // clean all lists
704           b.resetData(conf);
705           if (r.exitStatus == ExitStatus.IN_PROGRESS) {
706             done = false;
707           } else if (r.exitStatus != ExitStatus.SUCCESS) {
708             //must be an error statue, return.
709             return r.exitStatus.getExitCode();
710           } else {
711             if (iteration > 0) {
712               assertTrue(r.bytesAlreadyMoved > 0);
713             }
714           }
715         }
716 
717         if (!done) {
718           Thread.sleep(sleeptime);
719         }
720       }
721     } finally {
722       for(NameNodeConnector nnc : connectors) {
723         IOUtils.cleanup(LOG, nnc);
724       }
725     }
726     return ExitStatus.SUCCESS.getExitCode();
727   }
728 
runBalancerCli(Configuration conf, long totalUsedSpace, long totalCapacity, Balancer.Parameters p, boolean useFile, int expectedExcludedNodes)729   private void runBalancerCli(Configuration conf,
730       long totalUsedSpace, long totalCapacity,
731       Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception {
732     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
733     List <String> args = new ArrayList<String>();
734     args.add("-policy");
735     args.add("datanode");
736 
737     File excludeHostsFile = null;
738     if (!p.nodesToBeExcluded.isEmpty()) {
739       args.add("-exclude");
740       if (useFile) {
741         excludeHostsFile = new File ("exclude-hosts-file");
742         PrintWriter pw = new PrintWriter(excludeHostsFile);
743         for (String host: p.nodesToBeExcluded) {
744           pw.write( host + "\n");
745         }
746         pw.close();
747         args.add("-f");
748         args.add("exclude-hosts-file");
749       } else {
750         args.add(StringUtils.join(p.nodesToBeExcluded, ','));
751       }
752     }
753 
754     File includeHostsFile = null;
755     if (!p.nodesToBeIncluded.isEmpty()) {
756       args.add("-include");
757       if (useFile) {
758         includeHostsFile = new File ("include-hosts-file");
759         PrintWriter pw = new PrintWriter(includeHostsFile);
760         for (String host: p.nodesToBeIncluded){
761           pw.write( host + "\n");
762         }
763         pw.close();
764         args.add("-f");
765         args.add("include-hosts-file");
766       } else {
767         args.add(StringUtils.join(p.nodesToBeIncluded, ','));
768       }
769     }
770 
771     final Tool tool = new Cli();
772     tool.setConf(conf);
773     final int r = tool.run(args.toArray(new String[0])); // start rebalancing
774 
775     assertEquals("Tools should exit 0 on success", 0, r);
776     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
777     LOG.info("Rebalancing with default ctor.");
778     waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes);
779 
780     if (excludeHostsFile != null && excludeHostsFile.exists()) {
781       excludeHostsFile.delete();
782     }
783     if (includeHostsFile != null && includeHostsFile.exists()) {
784       includeHostsFile.delete();
785     }
786   }
787 
788   /** one-node cluster test*/
oneNodeTest(Configuration conf, boolean useTool)789   private void oneNodeTest(Configuration conf, boolean useTool) throws Exception {
790     // add an empty node with half of the CAPACITY & the same rack
791     doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2,
792             RACK0, useTool);
793   }
794 
795   /** two-node cluster test */
twoNodeTest(Configuration conf)796   private void twoNodeTest(Configuration conf) throws Exception {
797     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
798         CAPACITY, RACK2, false);
799   }
800 
801   /** test using a user-supplied conf */
integrationTest(Configuration conf)802   public void integrationTest(Configuration conf) throws Exception {
803     initConf(conf);
804     oneNodeTest(conf, false);
805   }
806 
807   /* we first start a cluster and fill the cluster up to a certain size.
808    * then redistribute blocks according the required distribution.
809    * Then we start an empty datanode.
810    * Afterwards a balancer is run to balance the cluster.
811    * A partially filled datanode is excluded during balancing.
812    * This triggers a situation where one of the block's location is unknown.
813    */
814   @Test(timeout=100000)
testUnknownDatanode()815   public void testUnknownDatanode() throws Exception {
816     Configuration conf = new HdfsConfiguration();
817     initConf(conf);
818     long distribution[] = new long[] {50*CAPACITY/100, 70*CAPACITY/100, 0*CAPACITY/100};
819     long capacities[] = new long[]{CAPACITY, CAPACITY, CAPACITY};
820     String racks[] = new String[] {RACK0, RACK1, RACK1};
821 
822     int numDatanodes = distribution.length;
823     if (capacities.length != numDatanodes || racks.length != numDatanodes) {
824       throw new IllegalArgumentException("Array length is not the same");
825     }
826 
827     // calculate total space that need to be filled
828     final long totalUsedSpace = sum(distribution);
829 
830     // fill the cluster
831     ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace,
832         (short) numDatanodes);
833 
834     // redistribute blocks
835     Block[][] blocksDN = distributeBlocks(
836         blocks, (short)(numDatanodes-1), distribution);
837 
838     // restart the cluster: do NOT format the cluster
839     conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
840     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
841         .format(false)
842         .racks(racks)
843         .simulatedCapacities(capacities)
844         .build();
845     try {
846       cluster.waitActive();
847       client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
848           ClientProtocol.class).getProxy();
849 
850       for(int i = 0; i < 3; i++) {
851         cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
852       }
853 
854       cluster.startDataNodes(conf, 1, true, null,
855           new String[]{RACK0}, null,new long[]{CAPACITY});
856       cluster.triggerHeartbeats();
857 
858       Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
859       Set<String>  datanodes = new HashSet<String>();
860       datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
861       Balancer.Parameters p = new Balancer.Parameters(
862           Balancer.Parameters.DEFAULT.policy,
863           Balancer.Parameters.DEFAULT.threshold,
864           Balancer.Parameters.DEFAULT.maxIdleIteration,
865           datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
866       final int r = Balancer.run(namenodes, p, conf);
867       assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
868     } finally {
869       cluster.shutdown();
870     }
871   }
872 
873   /**
874    * Test parse method in Balancer#Cli class with threshold value out of
875    * boundaries.
876    */
877   @Test(timeout=100000)
testBalancerCliParseWithThresholdOutOfBoundaries()878   public void testBalancerCliParseWithThresholdOutOfBoundaries() {
879     String parameters[] = new String[] { "-threshold", "0" };
880     String reason = "IllegalArgumentException is expected when threshold value"
881         + " is out of boundary.";
882     try {
883       Balancer.Cli.parse(parameters);
884       fail(reason);
885     } catch (IllegalArgumentException e) {
886       assertEquals("Number out of range: threshold = 0.0", e.getMessage());
887     }
888     parameters = new String[] { "-threshold", "101" };
889     try {
890       Balancer.Cli.parse(parameters);
891       fail(reason);
892     } catch (IllegalArgumentException e) {
893       assertEquals("Number out of range: threshold = 101.0", e.getMessage());
894     }
895   }
896 
897   /** Test a cluster with even distribution,
898    * then a new empty node is added to the cluster*/
899   @Test(timeout=100000)
testBalancer0()900   public void testBalancer0() throws Exception {
901     testBalancer0Internal(new HdfsConfiguration());
902   }
903 
testBalancer0Internal(Configuration conf)904   void testBalancer0Internal(Configuration conf) throws Exception {
905     initConf(conf);
906     oneNodeTest(conf, false);
907     twoNodeTest(conf);
908   }
909 
910   /** Test unevenly distributed cluster */
911   @Test(timeout=100000)
testBalancer1()912   public void testBalancer1() throws Exception {
913     testBalancer1Internal(new HdfsConfiguration());
914   }
915 
testBalancer1Internal(Configuration conf)916   void testBalancer1Internal(Configuration conf) throws Exception {
917     initConf(conf);
918     testUnevenDistribution(conf,
919         new long[] {50*CAPACITY/100, 10*CAPACITY/100},
920         new long[]{CAPACITY, CAPACITY},
921         new String[] {RACK0, RACK1});
922   }
923 
924   @Test(timeout=100000)
testBalancerWithZeroThreadsForMove()925   public void testBalancerWithZeroThreadsForMove() throws Exception {
926     Configuration conf = new HdfsConfiguration();
927     conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);
928     testBalancer1Internal (conf);
929   }
930 
931   @Test(timeout=100000)
testBalancerWithNonZeroThreadsForMove()932   public void testBalancerWithNonZeroThreadsForMove() throws Exception {
933     Configuration conf = new HdfsConfiguration();
934     conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8);
935     testBalancer1Internal (conf);
936   }
937 
938   @Test(timeout=100000)
testBalancer2()939   public void testBalancer2() throws Exception {
940     testBalancer2Internal(new HdfsConfiguration());
941   }
942 
testBalancer2Internal(Configuration conf)943   void testBalancer2Internal(Configuration conf) throws Exception {
944     initConf(conf);
945     testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY },
946         new String[] { RACK0, RACK1 }, CAPACITY, RACK2);
947   }
948 
testBalancerDefaultConstructor(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack)949   private void testBalancerDefaultConstructor(Configuration conf,
950       long[] capacities, String[] racks, long newCapacity, String newRack)
951       throws Exception {
952     int numOfDatanodes = capacities.length;
953     assertEquals(numOfDatanodes, racks.length);
954     cluster = new MiniDFSCluster.Builder(conf)
955                                 .numDataNodes(capacities.length)
956                                 .racks(racks)
957                                 .simulatedCapacities(capacities)
958                                 .build();
959     try {
960       cluster.waitActive();
961       client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
962           ClientProtocol.class).getProxy();
963 
964       long totalCapacity = sum(capacities);
965 
966       // fill up the cluster to be 30% full
967       long totalUsedSpace = totalCapacity * 3 / 10;
968       createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
969           (short) numOfDatanodes, 0);
970       // start up an empty node with the same capacity and on the same rack
971       cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
972           new long[] { newCapacity });
973 
974       totalCapacity += newCapacity;
975 
976       // run balancer and validate results
977       runBalancer(conf, totalUsedSpace, totalCapacity);
978     } finally {
979       cluster.shutdown();
980     }
981   }
982 
983   /**
984    * Verify balancer exits 0 on success.
985    */
986   @Test(timeout=100000)
testExitZeroOnSuccess()987   public void testExitZeroOnSuccess() throws Exception {
988     final Configuration conf = new HdfsConfiguration();
989 
990     initConf(conf);
991 
992     oneNodeTest(conf, true);
993   }
994 
995   /**
996    * Test parse method in Balancer#Cli class with wrong number of params
997    */
998 
999   @Test
testBalancerCliParseWithWrongParams()1000   public void testBalancerCliParseWithWrongParams() {
1001     String parameters[] = new String[] { "-threshold" };
1002     String reason =
1003         "IllegalArgumentException is expected when value is not specified";
1004     try {
1005       Balancer.Cli.parse(parameters);
1006       fail(reason);
1007     } catch (IllegalArgumentException e) {
1008 
1009     }
1010     parameters = new String[] { "-policy" };
1011     try {
1012       Balancer.Cli.parse(parameters);
1013       fail(reason);
1014     } catch (IllegalArgumentException e) {
1015 
1016     }
1017     parameters = new String[] {"-threshold", "1", "-policy"};
1018     try {
1019       Balancer.Cli.parse(parameters);
1020       fail(reason);
1021     } catch (IllegalArgumentException e) {
1022 
1023     }
1024     parameters = new String[] {"-threshold", "1", "-include"};
1025     try {
1026       Balancer.Cli.parse(parameters);
1027       fail(reason);
1028     } catch (IllegalArgumentException e) {
1029 
1030     }
1031     parameters = new String[] {"-threshold", "1", "-exclude"};
1032     try {
1033       Balancer.Cli.parse(parameters);
1034       fail(reason);
1035     } catch (IllegalArgumentException e) {
1036 
1037     }
1038     parameters = new String[] {"-include",  "-f"};
1039     try {
1040       Balancer.Cli.parse(parameters);
1041       fail(reason);
1042     } catch (IllegalArgumentException e) {
1043 
1044     }
1045     parameters = new String[] {"-exclude",  "-f"};
1046     try {
1047       Balancer.Cli.parse(parameters);
1048       fail(reason);
1049     } catch (IllegalArgumentException e) {
1050 
1051     }
1052 
1053     parameters = new String[] {"-include",  "testnode1", "-exclude", "testnode2"};
1054     try {
1055       Balancer.Cli.parse(parameters);
1056       fail("IllegalArgumentException is expected when both -exclude and -include are specified");
1057     } catch (IllegalArgumentException e) {
1058 
1059     }
1060   }
1061 
1062 
1063   /**
1064    * Test a cluster with even distribution,
1065    * then three nodes are added to the cluster,
1066    * runs balancer with two of the nodes in the exclude list
1067    */
1068   @Test(timeout=100000)
testBalancerWithExcludeList()1069   public void testBalancerWithExcludeList() throws Exception {
1070     final Configuration conf = new HdfsConfiguration();
1071     initConf(conf);
1072     Set<String> excludeHosts = new HashSet<String>();
1073     excludeHosts.add( "datanodeY");
1074     excludeHosts.add( "datanodeZ");
1075     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
1076         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
1077         excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false);
1078   }
1079 
1080   /**
1081    * Test a cluster with even distribution,
1082    * then three nodes are added to the cluster,
1083    * runs balancer with two of the nodes in the exclude list
1084    */
1085   @Test(timeout=100000)
testBalancerWithExcludeListWithPorts()1086   public void testBalancerWithExcludeListWithPorts() throws Exception {
1087     final Configuration conf = new HdfsConfiguration();
1088     initConf(conf);
1089     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
1090         CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false);
1091   }
1092 
1093   /**
1094    * Test a cluster with even distribution,
1095    * then three nodes are added to the cluster,
1096    * runs balancer with two of the nodes in the exclude list
1097    */
1098   @Test(timeout=100000)
testBalancerCliWithExcludeList()1099   public void testBalancerCliWithExcludeList() throws Exception {
1100     final Configuration conf = new HdfsConfiguration();
1101     initConf(conf);
1102     Set<String> excludeHosts = new HashSet<String>();
1103     excludeHosts.add( "datanodeY");
1104     excludeHosts.add( "datanodeZ");
1105     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
1106       new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts,
1107       Parameters.DEFAULT.nodesToBeIncluded), true, false);
1108   }
1109 
1110   /**
1111    * Test a cluster with even distribution,
1112    * then three nodes are added to the cluster,
1113    * runs balancer with two of the nodes in the exclude list
1114    */
1115   @Test(timeout=100000)
testBalancerCliWithExcludeListWithPorts()1116   public void testBalancerCliWithExcludeListWithPorts() throws Exception {
1117     final Configuration conf = new HdfsConfiguration();
1118     initConf(conf);
1119     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
1120         CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false);
1121   }
1122 
1123   /**
1124    * Test a cluster with even distribution,
1125    * then three nodes are added to the cluster,
1126    * runs balancer with two of the nodes in the exclude list in a file
1127    */
1128   @Test(timeout=100000)
testBalancerCliWithExcludeListInAFile()1129   public void testBalancerCliWithExcludeListInAFile() throws Exception {
1130     final Configuration conf = new HdfsConfiguration();
1131     initConf(conf);
1132     Set<String> excludeHosts = new HashSet<String>();
1133     excludeHosts.add( "datanodeY");
1134     excludeHosts.add( "datanodeZ");
1135     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
1136         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
1137         excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true);
1138   }
1139 
1140   /**
1141    * Test a cluster with even distribution,G
1142    * then three nodes are added to the cluster,
1143    * runs balancer with two of the nodes in the exclude list
1144    */
1145   @Test(timeout=100000)
testBalancerCliWithExcludeListWithPortsInAFile()1146   public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception {
1147     final Configuration conf = new HdfsConfiguration();
1148     initConf(conf);
1149     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
1150         CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true);
1151   }
1152 
1153   /**
1154    * Test a cluster with even distribution,
1155    * then three nodes are added to the cluster,
1156    * runs balancer with two of the nodes in the include list
1157    */
1158   @Test(timeout=100000)
testBalancerWithIncludeList()1159   public void testBalancerWithIncludeList() throws Exception {
1160     final Configuration conf = new HdfsConfiguration();
1161     initConf(conf);
1162     Set<String> includeHosts = new HashSet<String>();
1163     includeHosts.add( "datanodeY");
1164     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
1165         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
1166         Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false);
1167   }
1168 
1169   /**
1170    * Test a cluster with even distribution,
1171    * then three nodes are added to the cluster,
1172    * runs balancer with two of the nodes in the include list
1173    */
1174   @Test(timeout=100000)
testBalancerWithIncludeListWithPorts()1175   public void testBalancerWithIncludeListWithPorts() throws Exception {
1176     final Configuration conf = new HdfsConfiguration();
1177     initConf(conf);
1178     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
1179         CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false);
1180   }
1181 
1182   /**
1183    * Test a cluster with even distribution,
1184    * then three nodes are added to the cluster,
1185    * runs balancer with two of the nodes in the include list
1186    */
1187   @Test(timeout=100000)
testBalancerCliWithIncludeList()1188   public void testBalancerCliWithIncludeList() throws Exception {
1189     final Configuration conf = new HdfsConfiguration();
1190     initConf(conf);
1191     Set<String> includeHosts = new HashSet<String>();
1192     includeHosts.add( "datanodeY");
1193     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
1194         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
1195         Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false);
1196   }
1197 
1198   /**
1199    * Test a cluster with even distribution,
1200    * then three nodes are added to the cluster,
1201    * runs balancer with two of the nodes in the include list
1202    */
1203   @Test(timeout=100000)
testBalancerCliWithIncludeListWithPorts()1204   public void testBalancerCliWithIncludeListWithPorts() throws Exception {
1205     final Configuration conf = new HdfsConfiguration();
1206     initConf(conf);
1207     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
1208         CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false);
1209   }
1210 
1211   /**
1212    * Test a cluster with even distribution,
1213    * then three nodes are added to the cluster,
1214    * runs balancer with two of the nodes in the include list
1215    */
1216   @Test(timeout=100000)
testBalancerCliWithIncludeListInAFile()1217   public void testBalancerCliWithIncludeListInAFile() throws Exception {
1218     final Configuration conf = new HdfsConfiguration();
1219     initConf(conf);
1220     Set<String> includeHosts = new HashSet<String>();
1221     includeHosts.add( "datanodeY");
1222     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
1223         new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
1224         Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true);
1225   }
1226 
1227   /**
1228    * Test a cluster with even distribution,
1229    * then three nodes are added to the cluster,
1230    * runs balancer with two of the nodes in the include list
1231    */
1232   @Test(timeout=100000)
testBalancerCliWithIncludeListWithPortsInAFile()1233   public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception {
1234     final Configuration conf = new HdfsConfiguration();
1235     initConf(conf);
1236     doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
1237         CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
1238   }
1239 
1240   /*
1241    * Test Balancer with Ram_Disk configured
1242    * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
1243    * Then verify that the balancer does not migrate files on RAM_DISK across DN.
1244    */
1245   @Test(timeout=300000)
testBalancerWithRamDisk()1246   public void testBalancerWithRamDisk() throws Exception {
1247     final int SEED = 0xFADED;
1248     final short REPL_FACT = 1;
1249     Configuration conf = new Configuration();
1250     initConfWithRamDisk(conf);
1251 
1252     final int defaultRamDiskCapacity = 10;
1253     final long ramDiskStorageLimit =
1254       ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
1255       (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
1256     final long diskStorageLimit =
1257       ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
1258       (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
1259 
1260     cluster = new MiniDFSCluster
1261       .Builder(conf)
1262       .numDataNodes(1)
1263       .storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit })
1264       .storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
1265       .build();
1266 
1267     try {
1268       cluster.waitActive();
1269       // Create few files on RAM_DISK
1270       final String METHOD_NAME = GenericTestUtils.getMethodName();
1271       final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
1272       final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
1273 
1274       DistributedFileSystem fs = cluster.getFileSystem();
1275       DFSClient client = fs.getClient();
1276       DFSTestUtil.createFile(fs, path1, true,
1277         DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
1278         DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
1279       DFSTestUtil.createFile(fs, path2, true,
1280         DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
1281         DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
1282 
1283       // Sleep for a short time to allow the lazy writer thread to do its job
1284       Thread.sleep(6 * 1000);
1285 
1286       // Add another fresh DN with the same type/capacity without files on RAM_DISK
1287       StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
1288       long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, diskStorageLimit}};
1289       cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
1290         null, null, storageCapacities, null, false, false, false, null);
1291 
1292       cluster.triggerHeartbeats();
1293       Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
1294 
1295       // Run Balancer
1296       Balancer.Parameters p = new Balancer.Parameters(
1297         Parameters.DEFAULT.policy,
1298         Parameters.DEFAULT.threshold,
1299         Balancer.Parameters.DEFAULT.maxIdleIteration,
1300         Parameters.DEFAULT.nodesToBeExcluded,
1301         Parameters.DEFAULT.nodesToBeIncluded);
1302       final int r = Balancer.run(namenodes, p, conf);
1303 
1304       // Validate no RAM_DISK block should be moved
1305       assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
1306 
1307       // Verify files are still on RAM_DISK
1308       DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
1309       DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
1310     } finally {
1311       cluster.shutdown();
1312     }
1313   }
1314 
1315   /**
1316    * Test special case. Two replicas belong to same block should not in same node.
1317    * We have 2 nodes.
1318    * We have a block in (DN0,SSD) and (DN1,DISK).
1319    * Replica in (DN0,SSD) should not be moved to (DN1,SSD).
1320    * Otherwise DN1 has 2 replicas.
1321    */
1322   @Test(timeout=100000)
testTwoReplicaShouldNotInSameDN()1323   public void testTwoReplicaShouldNotInSameDN() throws Exception {
1324     final Configuration conf = new HdfsConfiguration();
1325 
1326     int blockSize = 5 * 1024 * 1024 ;
1327     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
1328     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
1329     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
1330 
1331     int numOfDatanodes =2;
1332     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
1333         .numDataNodes(2)
1334         .racks(new String[]{"/default/rack0", "/default/rack0"})
1335         .storagesPerDatanode(2)
1336         .storageTypes(new StorageType[][]{
1337             {StorageType.SSD, StorageType.DISK},
1338             {StorageType.SSD, StorageType.DISK}})
1339         .storageCapacities(new long[][]{
1340             {100 * blockSize, 20 * blockSize},
1341             {20 * blockSize, 100 * blockSize}})
1342         .build();
1343 
1344     try {
1345       cluster.waitActive();
1346 
1347       //set "/bar" directory with ONE_SSD storage policy.
1348       DistributedFileSystem fs = cluster.getFileSystem();
1349       Path barDir = new Path("/bar");
1350       fs.mkdir(barDir,new FsPermission((short)777));
1351       fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
1352 
1353       // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
1354       // and (DN0,SSD) and (DN1,DISK) are about 15% full.
1355       long fileLen  = 30 * blockSize;
1356       // fooFile has ONE_SSD policy. So
1357       // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
1358       // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
1359       Path fooFile = new Path(barDir, "foo");
1360       createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
1361       // update space info
1362       cluster.triggerHeartbeats();
1363 
1364       Balancer.Parameters p = Balancer.Parameters.DEFAULT;
1365       Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
1366       final int r = Balancer.run(namenodes, p, conf);
1367 
1368       // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
1369       // already has one. Otherwise DN1 will have 2 replicas.
1370       // For same reason, no replicas were moved.
1371       assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
1372 
1373     } finally {
1374       cluster.shutdown();
1375     }
1376   }
1377 
1378   /**
1379    * Test running many balancer simultaneously.
1380    *
1381    * Case-1: First balancer is running. Now, running second one should get
1382    * "Another balancer is running. Exiting.." IOException and fail immediately
1383    *
1384    * Case-2: When running second balancer 'balancer.id' file exists but the
1385    * lease doesn't exists. Now, the second balancer should run successfully.
1386    */
1387   @Test(timeout = 100000)
testManyBalancerSimultaneously()1388   public void testManyBalancerSimultaneously() throws Exception {
1389     final Configuration conf = new HdfsConfiguration();
1390     initConf(conf);
1391     // add an empty node with half of the capacities(4 * CAPACITY) & the same
1392     // rack
1393     long[] capacities = new long[] { 4 * CAPACITY };
1394     String[] racks = new String[] { RACK0 };
1395     long newCapacity = 2 * CAPACITY;
1396     String newRack = RACK0;
1397     LOG.info("capacities = " + long2String(capacities));
1398     LOG.info("racks      = " + Arrays.asList(racks));
1399     LOG.info("newCapacity= " + newCapacity);
1400     LOG.info("newRack    = " + newRack);
1401     LOG.info("useTool    = " + false);
1402     assertEquals(capacities.length, racks.length);
1403     int numOfDatanodes = capacities.length;
1404     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
1405         .racks(racks).simulatedCapacities(capacities).build();
1406     try {
1407       cluster.waitActive();
1408       client = NameNodeProxies.createProxy(conf,
1409           cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
1410 
1411       long totalCapacity = sum(capacities);
1412 
1413       // fill up the cluster to be 30% full
1414       final long totalUsedSpace = totalCapacity * 3 / 10;
1415       createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
1416           (short) numOfDatanodes, 0);
1417       // start up an empty node with the same capacity and on the same rack
1418       cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
1419           new long[] { newCapacity });
1420 
1421       // Case1: Simulate first balancer by creating 'balancer.id' file. It
1422       // will keep this file until the balancing operation is completed.
1423       FileSystem fs = cluster.getFileSystem(0);
1424       final FSDataOutputStream out = fs
1425           .create(Balancer.BALANCER_ID_PATH, false);
1426       out.writeBytes(InetAddress.getLocalHost().getHostName());
1427       out.hflush();
1428       assertTrue("'balancer.id' file doesn't exist!",
1429           fs.exists(Balancer.BALANCER_ID_PATH));
1430 
1431       // start second balancer
1432       final String[] args = { "-policy", "datanode" };
1433       final Tool tool = new Cli();
1434       tool.setConf(conf);
1435       int exitCode = tool.run(args); // start balancing
1436       assertEquals("Exit status code mismatches",
1437           ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
1438 
1439       // Case2: Release lease so that another balancer would be able to
1440       // perform balancing.
1441       out.close();
1442       assertTrue("'balancer.id' file doesn't exist!",
1443           fs.exists(Balancer.BALANCER_ID_PATH));
1444       exitCode = tool.run(args); // start balancing
1445       assertEquals("Exit status code mismatches",
1446           ExitStatus.SUCCESS.getExitCode(), exitCode);
1447     } finally {
1448       cluster.shutdown();
1449     }
1450   }
1451 
1452   /**
1453    * @param args
1454    */
main(String[] args)1455   public static void main(String[] args) throws Exception {
1456     TestBalancer balancerTest = new TestBalancer();
1457     balancerTest.testBalancer0();
1458     balancerTest.testBalancer1();
1459     balancerTest.testBalancer2();
1460   }
1461 }
1462