1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import java.io.BufferedReader;
21 import java.io.ByteArrayInputStream;
22 import java.io.ByteArrayOutputStream;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.InputStreamReader;
27 import java.io.OutputStream;
28 import java.io.PrintStream;
29 import java.net.URI;
30 
31 import java.security.SecureRandom;
32 
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Set;
40 
41 import org.apache.commons.lang.ArrayUtils;
42 
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FileStatus;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 
51 import org.apache.hadoop.hdfs.test.system.HDFSCluster;
52 import org.apache.hadoop.hdfs.test.system.NNClient;
53 import org.apache.hadoop.hdfs.test.system.DNClient;
54 
55 
56 import org.apache.hadoop.io.IOUtils;
57 import org.apache.hadoop.util.Progressable;
58 
59 import org.junit.After;
60 import org.junit.Assert;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.mortbay.util.ajax.JSON;
64 
65 public class TestBalancer {
66 
67     private static final Log LOG = LogFactory.getLog(TestBalancer.class);
68     private static final String BALANCER_TEMP_DIR = "balancer-temp";
69     private Configuration hadoopConf;
70     private HDFSCluster dfsCluster;
71 
TestBalancer()72     public TestBalancer() throws Exception {
73     }
74 
75     @Before
setUp()76     public void setUp() throws Exception {
77         hadoopConf = new Configuration();
78         dfsCluster = HDFSCluster.createCluster(hadoopConf);
79         dfsCluster.setUp();
80     }
81 
82     @After
tearDown()83     public void tearDown() throws Exception {
84         dfsCluster.tearDown();
85     }
86 
87     // Trivial @Test
testNamenodePing()88     public void testNamenodePing() throws IOException {
89         LOG.info("testing filesystem ping");
90         NNClient namenode = dfsCluster.getNNClient();
91         namenode.ping();
92         LOG.info("done.");
93     }
94 
95     // Trivial @Test
testNamenodeConnectDisconnect()96     public void testNamenodeConnectDisconnect() throws IOException {
97         LOG.info("connecting to namenode");
98         NNClient namenode = dfsCluster.getNNClient();
99         namenode.connect();
100         LOG.info("done.");
101         LOG.info("disconnecting from namenode");
102         namenode.disconnect();
103     }
104 
105     /**
106      * The basic scenario for balancer test is as follows
107      *
108      *  - Bring up cluster with 1 DataNode
109      *  - Load DataNode to >50%
110      *  - Count files/blocks on DataNode
111      *  - Add new, empty DataNode to cluster
112      *  - Run Balancer
113      *  - Count files/blocks on DataNodes
114      *  - Blocks counts from before and after Balancer run should be consistent
115      *
116      */
117     @Test
testBalancerBasicScenario()118     public void testBalancerBasicScenario() throws IOException {
119         Path balancerTempDir = null;
120         try {
121             List<DNClient> testnodes = reserveDatanodesForTest(2);
122             DNClient testnode1 = testnodes.get(0);
123             DNClient testnode2 = testnodes.get(1);
124             shutdownNonTestNodes(testnodes);
125 
126             LOG.info("attempting to kill both test nodes");
127             stopDatanode(testnode1);
128             stopDatanode(testnode2);
129 
130             LOG.info("starting up datanode ["+
131             testnode1.getHostName()+
132             "] and loading it with data");
133             startDatanode(testnode1);
134 
135             // mkdir balancer-temp
136             balancerTempDir = makeTempDir();
137             // write 2 blocks to file system
138             LOG.info("generating filesystem load");
139             // TODO spec blocks to generate by blockCount, blockSize, # of writers
140             generateFileSystemLoad(2);  // generate 2 blocks of test data
141 
142             LOG.info("measure space used on 1st node");
143             long usedSpace0 = getDatanodeUsedSpace(testnode1);
144             LOG.info("datanode " + testnode1.getHostName()
145                     + " contains " + usedSpace0 + " bytes");
146 
147             LOG.info("bring up a 2nd node and run balancer on DFS");
148             startDatanode(testnode2);
149             runBalancerAndVerify(testnodes);
150         } catch (Throwable t) {
151             LOG.info("method testBalancer failed", t);
152         } finally {
153             // finally block to run cleanup
154             LOG.info("clean off test data from DFS [rmr ~/balancer-temp]");
155             try {
156                 deleteTempDir(balancerTempDir);
157             } catch (Exception e) {
158                 LOG.warn("problem cleaning up temp dir", e);
159             }
160 
161             // restart killed nodes
162             Iterator<DNClient> iter = dfsCluster.getDNClients().iterator();
163 
164             while (iter.hasNext()) {
165                 DNClient dn = iter.next();
166                 startDatanode( dn );
167             }
168         }
169     }
170 
shutdownNonTestNodes(List<DNClient> testnodes)171     private void shutdownNonTestNodes(List<DNClient> testnodes) {
172         Set killSet = new HashSet(getAllDatanodes());
173         killSet.removeAll(testnodes);
174         LOG.info("attempting to kill/suspend all the nodes not used for this test");
175         Iterator<DNClient> iter = killSet.iterator();
176         DNClient dn = null;
177         while (iter.hasNext()) {
178             dn = iter.next();
179             // kill may not work with some secure-HDFS configs,
180             // so using our stopDataNode() method
181             stopDatanode(dn);
182         }
183     }
184 
185     /**
186      * Kill all datanodes but leave reservationCount nodes alive,
187      * return a list of the reserved datanodes
188      */
reserveDatanodesForTest(int reservationCount)189     private List<DNClient> reserveDatanodesForTest(int reservationCount) {
190         List<DNClient> testDNs = new LinkedList<DNClient>();
191         List<DNClient> dieDNs = new LinkedList<DNClient>();
192         LOG.info("getting collection of live data nodes");
193         List<DNClient> dnList = getAllDatanodes();
194         int dnCount = dnList.size();
195         //  check to make sure there is enough capacity on these nodes to run test
196         Assert.assertTrue(
197                 String.format(
198                     "not enough datanodes available to run test,"
199                     + " need %d datanodes but have only %d available",
200                     reservationCount, dnCount),
201                 ( dnCount >= reservationCount ));
202         LOG.info("selecting "+reservationCount+" nodes for test");
203         dieDNs = new LinkedList<DNClient>(dnList);
204         testDNs = new LinkedList<DNClient>();
205 
206         final int LEN = dnCount - 1;
207         int i = getRandom(LEN);
208         DNClient testDN = dieDNs.get(i);
209         testDNs.add(testDN);
210         dieDNs.remove(testDN);
211         int j = i;
212         do {
213             i = getRandom(LEN);
214         } while (i != j);
215         testDN = dieDNs.get(i);
216         testDNs.add(testDN);
217         dieDNs.remove(testDN);
218 
219         LOG.info("nodes reserved for test");
220         printDatanodeList(testDNs);
221 
222         LOG.info("nodes not used in test");
223         printDatanodeList(dieDNs);
224 
225         return testDNs;
226     }
227 
getAllDatanodes()228     private List<DNClient> getAllDatanodes() {
229         return dfsCluster.getDNClients();
230     }
231 
232     private final static DNClient[] DATANODE_ARRAY = {};
toDatanodeArray(List<DNClient> datanodeList)233     private DNClient[] toDatanodeArray(List<DNClient> datanodeList) {
234         return (DNClient[]) datanodeList.toArray(DATANODE_ARRAY);
235     }
236 
237     /**
238      * Return a random number between 0 and N inclusive.
239      *
240      * @param int n
241      * @param n  max number to return
242      * @return random integer between 0 and N
243      */
getRandom(int n)244     private int getRandom(int n) {
245         return (int) (n * Math.random());
246     }
247 
248     /**
249      * Calculate if the error in expected and observed values is within tolerance
250      *
251      * @param expectedValue  expected value of experiment
252      * @param observedValue  observed value of experiment
253      * @param tolerance      per cent tolerance for error, represented as a int
254      */
withinTolerance(long expectedValue, long observedValue, int tolerance)255     private boolean withinTolerance(long expectedValue,
256                                     long observedValue,
257                                     int tolerance) {
258         double diff = 1.0 * Math.abs(observedValue - expectedValue);
259         double thrs = expectedValue * (tolerance/100);
260         return diff > thrs;
261     }
262 
263     //  emulate tolerance calculation in balancer code
264     public final static int DEFAULT_TOLERANCE = 10; // 10%
isClusterBalanced(DNClient[] datanodes)265     protected boolean isClusterBalanced(DNClient[] datanodes) throws IOException {
266         return isClusterBalanced(datanodes, DEFAULT_TOLERANCE);
267     }
isClusterBalanced(DNClient[] datanodes, int tolerance)268     protected boolean isClusterBalanced(DNClient[] datanodes, int tolerance)
269             throws IOException {
270 
271         Assert.assertFalse("empty datanode array specified",
272                 ArrayUtils.isEmpty(datanodes));
273         boolean result = true;
274         double[] utilizationByNode = new double[ datanodes.length ];
275         double totalUsedSpace = 0L;
276         double totalCapacity = 0L;
277         Map datanodeVolumeMap = new HashMap();
278         // accumulate space stored on each node
279         for(int i=0; i<datanodes.length; i++) {
280             DNClient datanode = datanodes[i];
281             Map volumeInfoMap = getDatanodeVolumeAttributes(datanode);
282             long usedSpace = (Long)volumeInfoMap.get(ATTRNAME_USED_SPACE);
283             long capacity  = (Long)volumeInfoMap.get(ATTRNAME_CAPACITY  );
284             utilizationByNode[i] = ( ((double)usedSpace)/capacity ) * 100;
285             totalUsedSpace += usedSpace;
286             totalCapacity  += capacity;
287         }
288         // here we are reusing previously fetched volume-info, for speed
289         // an alternative is to get fresh values from the cluster here instead
290         double avgUtilization = ( totalUsedSpace/totalCapacity ) * 100;
291         for(int i=0; i<datanodes.length; i++) {
292             double varUtilization = Math.abs(avgUtilization - utilizationByNode[i]);
293             if(varUtilization > tolerance) {
294                 result = false;
295                 break;
296             }
297         }
298 
299         return result;
300     }
301 
302     /**
303      * Make a working directory for storing temporary files
304      *
305      * @throws IOException
306      */
makeTempDir()307     private Path makeTempDir() throws IOException {
308         Path temp = new Path(BALANCER_TEMP_DIR);
309         FileSystem srcFs = temp.getFileSystem(hadoopConf);
310         FileStatus fstatus = null;
311         try {
312             fstatus = srcFs.getFileStatus(temp);
313             if (fstatus.isDir()) {
314                 LOG.warn(BALANCER_TEMP_DIR + ": File exists");
315             } else {
316                 LOG.warn(BALANCER_TEMP_DIR + " exists but is not a directory");
317             }
318             deleteTempDir(temp);
319         } catch (FileNotFoundException fileNotFoundExc) {
320         } finally {
321             if (!srcFs.mkdirs(temp)) {
322                 throw new IOException("failed to create " + BALANCER_TEMP_DIR);
323             }
324         }
325         return temp;
326     }
327 
328     /**
329      * Remove the working directory used to store temporary files
330      *
331      * @param temp
332      * @throws IOException
333      */
deleteTempDir(Path temp)334     private void deleteTempDir(Path temp) throws IOException {
335         FileSystem srcFs = temp.getFileSystem(hadoopConf);
336         LOG.info("attempting to delete path " + temp + "; this path exists? -> " + srcFs.exists(temp));
337         srcFs.delete(temp, true);
338     }
339 
printDatanodeList(List<DNClient> lis)340     private void printDatanodeList(List<DNClient> lis) {
341         for (DNClient datanode : lis) {
342             LOG.info("\t" + datanode.getHostName());
343         }
344     }
345 
346     private final static String CMD_STOP_DN = "sudo yinst stop hadoop_datanode_admin";
stopDatanode(DNClient dn)347     private void stopDatanode(DNClient dn) {
348         String dnHost = dn.getHostName();
349         runAndWatch(dnHost, CMD_STOP_DN);
350     }
351     private final static String CMD_START_DN = "sudo yinst start hadoop_datanode_admin";
startDatanode(DNClient dn)352     private void startDatanode(DNClient dn) {
353         String dnHost = dn.getHostName();
354         runAndWatch(dnHost, CMD_START_DN);
355     }
356 
357     /* using "old" default block size of 64M */
358     private static final int DFS_BLOCK_SIZE = 67108864;
359     private static final short DEFAULT_REPLICATION = 3;
generateFileSystemLoad(long numBlocks)360     private void generateFileSystemLoad(long numBlocks) {
361         generateFileSystemLoad(numBlocks, DEFAULT_REPLICATION);
362     }
generateFileSystemLoad(long numBlocks, short replication)363     private void generateFileSystemLoad(long numBlocks, short replication) {
364         String destfile = "hdfs:///user/hadoopqa/";// + BALANCER_TEMP_DIR + "/LOADGEN.DAT";
365         SecureRandom randgen = new SecureRandom();
366         ByteArrayOutputStream dat = null;
367         ByteArrayInputStream in = null;
368         final int CHUNK = 4096;
369         final Configuration testConf = new Configuration(hadoopConf);
370         try {
371             testConf.setInt("dfs.replication", replication);
372             for (int i = 0; i < numBlocks; i++) {
373                 FileSystem fs = FileSystem.get(
374                         URI.create(destfile), testConf);
375                 OutputStream out = fs.create(
376                         new Path(destfile),
377                         replication,
378                         new ProgressReporter());
379                 dat = new ByteArrayOutputStream(DFS_BLOCK_SIZE);
380                 for (int z = 0; z < DFS_BLOCK_SIZE; z += CHUNK) {
381                     byte[] bytes = new byte[CHUNK];
382                     randgen.nextBytes(bytes);
383                     dat.write(bytes, 0, CHUNK);
384                 }
385 
386                 in = new ByteArrayInputStream(dat.toByteArray());
387                 IOUtils.copyBytes(in, out, CHUNK, true);
388                 LOG.info("wrote block " + (i + 1) + " of " + numBlocks);
389             }
390         } catch (IOException ioExc) {
391             LOG.warn("f/s loadgen failed!", ioExc);
392         } finally {
393             try {
394                 dat.close();
395             } catch (Exception e) {
396             }
397             try {
398                 in.close();
399             } catch (Exception e) {
400             }
401         }
402     }
403     // TODO this should be taken from the environment
404     public final static String HADOOP_HOME = "/grid/0/gs/gridre/yroot.biga/share/hadoop-current";
405     public final static String CMD_SSH = "/usr/bin/ssh";
406     public final static String CMD_KINIT = "/usr/kerberos/bin/kinit";
407     public final static String CMD_HADOOP = HADOOP_HOME + "/bin/hadoop";
408     public final static String OPT_BALANCER = "balancer";
409     public final static String KERB_KEYTAB = "/homes/hadoopqa/hadoopqa.dev.headless.keytab";
410     public final static String KERB_PRINCIPAL = "hadoopqa@DEV.YGRID.YAHOO.COM";
411 
412     public final static int DEFAULT_THRESHOLD = 10;
runBalancer()413     private int runBalancer() throws IOException {
414         return runBalancer(DEFAULT_THRESHOLD);
415     }
416 
runBalancer(int threshold)417     private int runBalancer(int threshold) throws IOException {
418         return runBalancer(""+threshold);
419     }
420     /*
421      * TODO change the heap size balancer uses so it can run on gateways
422      * i.e., 14G heap is too big for gateways
423      */
runBalancer(String threshold)424     private int runBalancer(String threshold)
425             throws IOException {
426 
427         String balancerCommand = String.format("\"%s -k -t %s %s; %s %s -threshold %s",
428                 CMD_KINIT,
429                 KERB_KEYTAB,
430                 KERB_PRINCIPAL,
431                 CMD_HADOOP,
432                 OPT_BALANCER,
433                 threshold);
434         String nnHost = dfsCluster.getNNClient().getHostName();
435         return runAndWatch(nnHost, balancerCommand);
436     }
runBalancerAndVerify(List<DNClient> testnodes)437     private void runBalancerAndVerify(List<DNClient> testnodes)
438             throws IOException {
439         runBalancerAndVerify(testnodes, DEFAULT_THRESHOLD);
440     }
runBalancerAndVerify(List<DNClient> testnodes, int threshold)441     private void runBalancerAndVerify(List<DNClient> testnodes, int threshold)
442             throws IOException {
443         runBalancerAndVerify(testnodes, ""+DEFAULT_THRESHOLD);
444     }
runBalancerAndVerify(List<DNClient> testnodes, String threshold)445       private void runBalancerAndVerify(List<DNClient> testnodes, String threshold)
446             throws IOException {
447         int exitStatus = runBalancer(threshold);
448         // assert balancer exits with status SUCCESSe
449         Assert.assertTrue(
450                 String.format("balancer returned non-success exit code: %d",
451                 exitStatus),
452                 (exitStatus == SUCCESS));
453         DNClient[] testnodeArr = toDatanodeArray(testnodes);
454         Assert.assertTrue(
455                 "cluster is not balanced",
456                 isClusterBalanced(testnodeArr));
457     }
458 
runAndWatch(String remoteHost, String remoteCommand)459     private int runAndWatch(String remoteHost, String remoteCommand) {
460         int exitStatus = -1;
461         try {
462             Process proc = new ProcessBuilder(CMD_SSH, remoteHost, remoteCommand).start();
463             watchProcStream(proc.getInputStream(), System.out);
464             watchProcStream(proc.getErrorStream(), System.err);
465             exitStatus = proc.waitFor();
466         } catch(InterruptedException intExc) {
467             LOG.warn("got thread interrupt error", intExc);
468         } catch(IOException ioExc) {
469             LOG.warn("got i/o error", ioExc);
470         }
471         return exitStatus;
472     }
473 
watchProcStream(InputStream in, PrintStream out)474     private void watchProcStream(InputStream in, PrintStream out) {
475         new Thread(new StreamWatcher(in, out)).start();
476     }
477     private static final String DATANODE_VOLUME_INFO = "VolumeInfo";
478     private static final String ATTRNAME_USED_SPACE  = "usedSpace";
479     private static final String ATTRNAME_FREE_SPACE  = "freeSpace";
480     // pseudo attribute, JMX doesn't really provide this
481     private static final String ATTRNAME_CAPACITY    = "capacity";
482     // TODO maybe the static methods below belong in some utility class...
getDatanodeUsedSpace(DNClient datanode)483     private static long getDatanodeUsedSpace(DNClient datanode)
484             throws IOException {
485         return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_USED_SPACE);
486     }/*
487     private static long getDatanodeFreeSpace(DNClient datanode)
488             throws IOException {
489         return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_FREE_SPACE);
490     }*/
getDatanodeVolumeAttributes(DNClient datanode)491     private static Map getDatanodeVolumeAttributes(DNClient datanode)
492             throws IOException {
493         Map result = new HashMap();
494         long usedSpace = getVolumeAttribute(datanode, ATTRNAME_USED_SPACE);
495         long freeSpace = getVolumeAttribute(datanode, ATTRNAME_FREE_SPACE);
496         result.put(ATTRNAME_USED_SPACE, usedSpace);
497         result.put(ATTRNAME_CAPACITY,   usedSpace+freeSpace);
498         return result;
499     }
500 
getVolumeAttribute(DNClient datanode, String attribName)501     private static long getVolumeAttribute(DNClient datanode,
502                                                  String attribName)
503         throws IOException {
504 
505         Object volInfo = datanode.getDaemonAttribute(DATANODE_VOLUME_INFO);
506         Assert
507         .assertNotNull( String
508                         .format( "Attribute \"%s\" should be non-null",
509                                  DATANODE_VOLUME_INFO ),
510                         volInfo );
511         String strVolInfo = volInfo.toString();
512         LOG.debug( String.format("Value of %s: %s",
513                    DATANODE_VOLUME_INFO,
514                    strVolInfo) );
515         Map volInfoMap = (Map) JSON.parse(strVolInfo);
516         long attrVal = 0L;
517         for(Object key: volInfoMap.keySet()) {
518             Map attrMap = (Map) volInfoMap.get(key);
519             long val = (Long) attrMap.get(attribName);
520             attrVal += val;
521         }
522         return attrVal;
523 
524     }
525     /** simple utility to watch streams from an exec'ed process */
526     static class StreamWatcher implements Runnable {
527 
528         private BufferedReader reader;
529         private PrintStream printer;
530 
StreamWatcher(InputStream in, PrintStream out)531         StreamWatcher(InputStream in, PrintStream out) {
532             reader = getReader(in);
533             printer = out;
534         }
535 
getReader(InputStream in)536         private static BufferedReader getReader(InputStream in) {
537             return new BufferedReader(new InputStreamReader(in));
538         }
539 
run()540         public void run() {
541             try {
542                 if (reader.ready()) {
543                     printer.println(reader.readLine());
544                 }
545             } catch (IOException ioExc) {
546             }
547         }
548     }
549 
550     /** simple utility to report progress in generating data */
551     static class ProgressReporter implements Progressable {
552 
553         StringBuffer buf = null;
554 
progress()555         public void progress() {
556             if (buf == null) {
557                 buf = new StringBuffer();
558             }
559             buf.append(".");
560             if (buf.length() == 10000) {
561                 LOG.info("..........");
562                 buf = null;
563             }
564         }
565     }
566 
567     // A constant for SUCCESS exit code
568      static final int SUCCESS = 1;
569 
570      /**
571      * Balancer_01
572      * Start balancer and check if the cluster is balanced after the run.
573      * Cluster should end up in balanced state.
574      */
575     @Test
testBalancerSimple()576     public void testBalancerSimple() throws IOException {
577 
578         DNClient[] datanodes = toDatanodeArray( getAllDatanodes() );
579         int exitStatus = runBalancer();
580         // assert on successful exit code here
581         Assert.assertTrue(
582                 String.format("balancer returned non-success exit code: %d",
583                               exitStatus),
584                 (exitStatus == SUCCESS));
585         Assert.assertTrue( "cluster is not balanced", isClusterBalanced(datanodes) );
586 
587     }
588 
589     /**
590      * Balancer_02
591      * Test a cluster with even distribution, then a new empty node is
592      * added to the cluster. Here, even distribution effectively means the
593      * cluster is in "balanced" state, as bytes consumed for block allocation
594      * are evenly distributed throughout the cluster.
595      */
596     @Test
testBalancerEvenDistributionWithNewNodeAdded()597     public void testBalancerEvenDistributionWithNewNodeAdded() throws IOException {
598         throw new UnsupportedOperationException("not implemented yet!");
599 
600         // get all nodes
601         // need to get an external reserve of nodes we can boot up
602         // to add to this cluster?
603         // HOW?
604 
605         // IDEA try to steal some nodes from omega-M for now.....
606         // hmmm also need a way to give an alternate "empty-node" config
607         // to "hide" the data that may already exist on this node
608     }
609 
610     /**
611      * Balancer_03
612      * Bring up a 1-node DFS cluster. Set files replication factor to be 1
613      * and fill up the node to 30% full. Then add an empty datanode.
614      */
615      @Test
testBalancerSingleNodeClusterWithNewNodeAdded()616      public void testBalancerSingleNodeClusterWithNewNodeAdded() throws IOException {
617         // empty datanode: mod config to point to non-default blocks dir.
618         // limit capacity to available storage space
619         throw new UnsupportedOperationException("not implemented yet!");
620      }
621 
622     /**
623      * Balancer_04
624      * The same as _03 except that the empty new data node is on a
625      * different rack.
626      */
627      @Test
testBalancerSingleNodeClusterWithNewNodeAddedFromDifferentRack()628      public void testBalancerSingleNodeClusterWithNewNodeAddedFromDifferentRack()
629              throws IOException {
630          // need rack awareness
631          throw new UnsupportedOperationException("not implemented yet!");
632      }
633 
634     /**
635      * Balancer_05
636      * The same as _03 except that the empty new data node is half the
637      * capacity as the old one.
638      */
639      @Test
testBalancerSingleNodeClusterWithHalfCapacityNewNode()640      public void testBalancerSingleNodeClusterWithHalfCapacityNewNode() {
641          // how to limit node capacity?
642          throw new UnsupportedOperationException("not implemented yet!");
643      }
644 
645     /**
646      * Balancer_06
647      * Bring up a 2-node cluster and fill one node to be 60% and the
648      * other to be 10% full. All nodes are on different racks.
649      */
650      @Test
testBalancerTwoNodeMultiRackCluster()651     public void testBalancerTwoNodeMultiRackCluster() {
652          // need rack awareness
653         throw new UnsupportedOperationException("not implemented yet!");
654     }
655 
656     /**
657      * Balancer_07
658      * Bring up a dfs cluster with nodes A and B. Set file replication
659      * factor to be 2 and fill up the cluster to 30% full. Then add an
660      * empty data node C. All three nodes are on the same rack.
661      */
662      @Test
testBalancerTwoNodeSingleRackClusterWuthNewNodeAdded()663      public void testBalancerTwoNodeSingleRackClusterWuthNewNodeAdded()
664              throws IOException {
665 
666         final short TEST_REPLICATION_FACTOR = 3;
667         List<DNClient> testnodes = reserveDatanodesForTest(3);
668         DNClient dnA = testnodes.get(0);
669         DNClient dnB = testnodes.get(1);
670 
671         DNClient dnC = testnodes.get(2);
672         stopDatanode(dnC);
673 
674         // change test: 30% full-er (ie, 30% over pre-test capacity),
675         // use most heavily node as baseline
676         long targetLoad = (long) (
677                 (1/DFS_BLOCK_SIZE) *
678                 0.30 *
679             Math.max( getDatanodeUsedSpace(dnA), getDatanodeUsedSpace(dnB) ) );
680         generateFileSystemLoad(targetLoad, TEST_REPLICATION_FACTOR);
681         startDatanode(dnC);
682         runBalancerAndVerify(testnodes);
683      }
684 
685     /**
686      * Balancer_08
687      * The same as _07 except that A, B and C are on different racks.
688      */
689      @Test
testBalancerTwoNodeMultiRackClusterWithNewNodeAdded()690      public void testBalancerTwoNodeMultiRackClusterWithNewNodeAdded()
691              throws IOException {
692          // need rack awareness
693          throw new UnsupportedOperationException("not implemented yet!");
694      }
695 
696      /**
697      * Balancer_09
698      * The same as _07 except that interrupt balancing.
699      */
700      @Test
testBalancerTwoNodeSingleRackClusterInterruptingRebalance()701      public void testBalancerTwoNodeSingleRackClusterInterruptingRebalance()
702              throws IOException {
703          // interrupt thread
704         throw new UnsupportedOperationException("not implemented yet!");
705     }
706 
707     /**
708      * Balancer_10
709      * Restart rebalancing until it is done.
710      */
711     @Test
testBalancerRestartInterruptedBalancerUntilDone()712     public void testBalancerRestartInterruptedBalancerUntilDone()
713             throws IOException {
714         // need kill-restart thread
715         throw new UnsupportedOperationException("not implemented yet!");
716     }
717 
718     /**
719      * Balancer_11
720      * The same as _07 except that the namenode is shutdown while rebalancing.
721      */
722     @Test
testBalancerTwoNodeSingleRackShutdownNameNodeDuringRebalance()723     public void testBalancerTwoNodeSingleRackShutdownNameNodeDuringRebalance()
724             throws IOException {
725         // need NN shutdown thread in addition
726         throw new UnsupportedOperationException("not implemented yet!");
727     }
728 
729     /**
730      * Balancer_12
731      * The same as _05 except that FS writes occur during rebalancing.
732      */
733     @Test
734     public void
testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSWrites()735     testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSWrites()
736             throws IOException {
737         // writer thread
738         throw new UnsupportedOperationException("not implemented yet!");
739     }
740 
741     /**
742      * Balancer_13
743      * The same as _05 except that FS deletes occur during rebalancing.
744      */
745     @Test
testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletes()746     public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletes()
747             throws IOException {
748         // eraser thread
749         throw new UnsupportedOperationException("not implemented yet!");
750     }
751 
752     /**
753      * Balancer_14
754      * The same as _05 except that FS deletes AND writes occur during
755      * rebalancing.
756      */
757     @Test
testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletesAndWrites()758     public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletesAndWrites()
759             throws IOException {
760         // writer & eraser threads
761         throw new UnsupportedOperationException("not implemented yet!");
762     }
763 
764     /**
765      * Balancer_15
766      * Scalability test: Populate a 750-node cluster, then
767      *    1. Run rebalancing after 3 nodes are added
768      *    2. Run rebalancing after 2 racks of nodes (60 nodes) are added
769      *    3. Run rebalancing after 2 racks of nodes are added and concurrently
770      *       executing file writing and deleting at the same time
771      */
772     @Test
testBalancerScalability()773     public void testBalancerScalability() throws IOException {
774         /* work in progress->
775          *
776          *
777         List<DNClient> dnList = getAllDatanodes();
778         int dnCount = dnList.size();
779 
780         Assert.assertTrue(
781                 String.format(
782                     "not enough datanodes available to run test,"
783                     + " need 2 datanodes but have only %d available",
784                     dnCount),
785                 ( dnCount == (875 - 2) ));
786 
787         List<DNClient> datanodes = reserveDatanodesForTest(750);
788         shutdownNonTestNodes(datanodes);
789         */
790         throw new UnsupportedOperationException("not implemented yet!");
791     }
792 
793     /**
794      * Balancer_16
795      * Start balancer with a negative threshold value.
796      */
797     @Test
testBalancerConfiguredWithThresholdValueNegative()798     public void testBalancerConfiguredWithThresholdValueNegative()
799             throws IOException {
800         List<DNClient> testnodes = getAllDatanodes();
801         final int TRIALS=5;
802         for(int i=0; i<TRIALS; i++) {
803             int negThreshold = (int)(-1 * 100 * Math.random());
804             runBalancerAndVerify(testnodes, negThreshold);
805         }
806     }
807 
808     /**
809      * Balancer_17
810      * Start balancer with out-of-range threshold value
811      *  (e.g. -123, 0, -324, 100000, -12222222, 1000000000, -10000, 345, 989)
812      */
813     @Test
testBalancerConfiguredWithThresholdValueOutOfRange()814     public void testBalancerConfiguredWithThresholdValueOutOfRange()
815             throws IOException {
816         List<DNClient> testnodes = getAllDatanodes();
817         final int[] THRESHOLD_OUT_OF_RANGE_DATA = {
818             -123, 0, -324, 100000, -12222222, 1000000000, -10000, 345, 989
819         };
820         for(int threshold: THRESHOLD_OUT_OF_RANGE_DATA) {
821             runBalancerAndVerify(testnodes, threshold);
822         }
823     }
824 
825     /**
826      * Balancer_18
827      * Start balancer with alpha-numeric threshold value
828      *  (e.g., 103dsf, asd234, asfd, ASD, #$asd, 2345&, $35, %34)
829      */
830     @Test
testBalancerConfiguredWithThresholdValueAlphanumeric()831     public void testBalancerConfiguredWithThresholdValueAlphanumeric()
832             throws IOException {
833         List<DNClient> testnodes = getAllDatanodes();
834         final String[] THRESHOLD_ALPHA_DATA = {
835             "103dsf", "asd234", "asfd", "ASD", "#$asd", "2345&", "$35", "%34",
836             "0x64", "0xde", "0xad", "0xbe", "0xef"
837         };
838         for(String threshold: THRESHOLD_ALPHA_DATA) {
839             runBalancerAndVerify(testnodes,threshold);
840         }
841     }
842 
843     /**
844      * Balancer_19
845      * Start 2 instances of balancer on the same gateway
846      */
847     @Test
testBalancerRunTwoConcurrentInstancesOnSingleGateway()848     public void testBalancerRunTwoConcurrentInstancesOnSingleGateway()
849             throws IOException {
850         // do on gateway logic with small balancer heap
851         throw new UnsupportedOperationException("not implemented yet!");
852     }
853 
854     /**
855      * Balancer_20
856      * Start 2 instances of balancer on two different gateways
857      */
858     @Test
testBalancerRunTwoConcurrentInstancesOnDistinctGateways()859     public void testBalancerRunTwoConcurrentInstancesOnDistinctGateways()
860             throws IOException {
861             // do on gateway logic with small balancer heap
862         throw new UnsupportedOperationException("not implemented yet!");
863     }
864 
865     /**
866      * Balancer_21
867      * Start balancer when the cluster is already balanced
868      */
869     @Test
testBalancerOnBalancedCluster()870     public void testBalancerOnBalancedCluster() throws IOException {
871         // run balancer twice
872         testBalancerSimple();
873         testBalancerSimple();
874     }
875 
876     /**
877      * Balancer_22
878      * Running the balancer with half the data nodes not running
879      */
880      @Test
testBalancerWithOnlyHalfOfDataNodesRunning()881      public void testBalancerWithOnlyHalfOfDataNodesRunning()
882             throws IOException {
883         List<DNClient> datanodes = getAllDatanodes();
884         int testnodeCount = (int)Math.floor(datanodes.size() * 0.5);
885         List<DNClient> testnodes = reserveDatanodesForTest(testnodeCount);
886         runBalancerAndVerify(testnodes);
887     }
888 
889     /**
890      * Balancer_23
891      * Running the balancer and simultaneously simulating load on the
892      * cluster with half the data nodes not running.
893      */
894      @Test
testBalancerOnBusyClusterWithOnlyHalfOfDatanodesRunning()895      public void testBalancerOnBusyClusterWithOnlyHalfOfDatanodesRunning()
896              throws IOException {
897          // load thread
898          throw new UnsupportedOperationException("not implemented yet!");
899      }
900 
901     /**
902      * Protocol Test Prelude
903      *
904      * First set up 3 node cluster with nodes NA, NB and NC, which are on
905      * different racks. Then create a file with one block B with a replication
906      * factor 3. Finally add a new node ND to the cluster on the same rack as NC.
907      */
908 
909     /**
910      * ProtocolTest_01
911      * Copy block B from ND to NA with del hint NC
912      */
913     @Test
914     public void
testBlockReplacementProtocolFailWhenCopyBlockSourceDoesNotHaveBlockToCopy()915     testBlockReplacementProtocolFailWhenCopyBlockSourceDoesNotHaveBlockToCopy()
916             throws IOException {
917          throw new UnsupportedOperationException("not implemented yet!");
918     }
919 
920     /*
921      * ProtocolTest_02
922      * Copy block B from NA to NB with del hint NB
923      */
924     @Test
925     public void
testBlockReplacementProtocolFailWhenCopyBlockDestinationContainsBlockCopy()926     testBlockReplacementProtocolFailWhenCopyBlockDestinationContainsBlockCopy()
927             throws IOException {
928         throw new UnsupportedOperationException("not implemented yet!");
929     }
930 
931     /**
932      * ProtocolTest_03
933      * Copy block B from NA to ND with del hint NB
934      */
935     @Test
testBlockReplacementProtocolCopyBlock()936     public void testBlockReplacementProtocolCopyBlock() throws IOException {
937         throw new UnsupportedOperationException("not implemented yet!");
938     }
939 
940     /**
941      * ProtocolTest_04
942      * Copy block B from NB to NC with del hint NA
943      */
944     @Test
testBlockReplacementProtocolWithInvalidHint()945     public void testBlockReplacementProtocolWithInvalidHint()
946             throws IOException {
947         throw new UnsupportedOperationException("not implemented yet!");
948     }
949 
950     /**
951      * ThrottleTest_01
952      * Create a throttler with 1MB/s bandwidth. Send 6MB data, and throttle
953      * at 0.5MB, 0.75MB, and in the end [1MB/s?].
954      */
955 
956     /**
957      * NamenodeProtocolTest_01
958      * Get blocks from datanode 0 with a size of 2 blocks.
959      */
960     @Test
testNamenodeProtocolGetBlocksCheckThroughput()961     public void testNamenodeProtocolGetBlocksCheckThroughput()
962             throws IOException {
963         throw new UnsupportedOperationException("not implemented yet!");
964     }
965 
966     /**
967      * NamenodeProtocolTest_02
968      * Get blocks from datanode 0 with a size of 1 block.
969      */
970     @Test
testNamenodeProtocolGetSingleBlock()971     public void testNamenodeProtocolGetSingleBlock()
972             throws IOException {
973         throw new UnsupportedOperationException("not implemented yet!");
974     }
975 
976     /**
977      * NamenodeProtocolTest_03
978      * Get blocks from datanode 0 with a size of 0.
979      */
980     @Test
testNamenodeProtocolGetZeroBlocks()981     public void testNamenodeProtocolGetZeroBlocks() throws IOException {
982         throw new UnsupportedOperationException("not implemented yet!");
983     }
984     /**
985      * NamenodeProtocolTest_04
986      * Get blocks from datanode 0 with a size of -1.
987      */
988     @Test
testNamenodeProtocolGetMinusOneBlocks()989     public void testNamenodeProtocolGetMinusOneBlocks() throws Exception {
990 
991     }
992 
993     /**
994      * NamenodeProtocolTest_05
995      * Get blocks from a non-existent datanode.
996      */
997     @Test
testNamenodeProtocolGetBlocksFromNonexistentDatanode()998     public void testNamenodeProtocolGetBlocksFromNonexistentDatanode()
999             throws IOException {
1000         final short replication = 1;
1001         Path balancerTempDir = null;
1002         try {
1003         // reserve 2 nodes for test
1004         List<DNClient> testnodes = reserveDatanodesForTest(2);
1005         shutdownNonTestNodes(testnodes);
1006 
1007         DNClient testnode1 = testnodes.get(0);
1008         DNClient testnode2 = testnodes.get(1);
1009 
1010         // write some blocks with replication factor of 1
1011         balancerTempDir = makeTempDir();
1012         generateFileSystemLoad(20, replication);
1013 
1014         // get block locations from NN
1015         NNClient namenode = dfsCluster.getNNClient();
1016         // TODO extend namenode to get block locations
1017         //namenode.get
1018 
1019         // shutdown 1 node
1020         stopDatanode(testnode1);
1021 
1022         // attempt to retrieve blocks from the dead node
1023         // we should fail
1024         } finally {
1025             // cleanup
1026                         // finally block to run cleanup
1027             LOG.info("clean off test data from DFS [rmr ~/balancer-temp]");
1028             try {
1029                 deleteTempDir(balancerTempDir);
1030             } catch (Exception e) {
1031                 LOG.warn("problem cleaning up temp dir", e);
1032             }
1033         }
1034     }
1035 }
1036 
1037