1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import java.io.BufferedReader; 21 import java.io.ByteArrayInputStream; 22 import java.io.ByteArrayOutputStream; 23 import java.io.FileNotFoundException; 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.InputStreamReader; 27 import java.io.OutputStream; 28 import java.io.PrintStream; 29 import java.net.URI; 30 31 import java.security.SecureRandom; 32 33 import java.util.HashMap; 34 import java.util.HashSet; 35 import java.util.Iterator; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.Set; 40 41 import org.apache.commons.lang.ArrayUtils; 42 43 import org.apache.commons.logging.Log; 44 import org.apache.commons.logging.LogFactory; 45 46 import org.apache.hadoop.conf.Configuration; 47 import org.apache.hadoop.fs.FileStatus; 48 import org.apache.hadoop.fs.FileSystem; 49 import org.apache.hadoop.fs.Path; 50 51 import org.apache.hadoop.hdfs.test.system.HDFSCluster; 52 import org.apache.hadoop.hdfs.test.system.NNClient; 53 import org.apache.hadoop.hdfs.test.system.DNClient; 54 55 56 import org.apache.hadoop.io.IOUtils; 57 import org.apache.hadoop.util.Progressable; 58 59 import org.junit.After; 60 import org.junit.Assert; 61 import org.junit.Before; 62 import org.junit.Test; 63 import org.mortbay.util.ajax.JSON; 64 65 public class TestBalancer { 66 67 private static final Log LOG = LogFactory.getLog(TestBalancer.class); 68 private static final String BALANCER_TEMP_DIR = "balancer-temp"; 69 private Configuration hadoopConf; 70 private HDFSCluster dfsCluster; 71 TestBalancer()72 public TestBalancer() throws Exception { 73 } 74 75 @Before setUp()76 public void setUp() throws Exception { 77 hadoopConf = new Configuration(); 78 dfsCluster = HDFSCluster.createCluster(hadoopConf); 79 dfsCluster.setUp(); 80 } 81 82 @After tearDown()83 public void tearDown() throws Exception { 84 dfsCluster.tearDown(); 85 } 86 87 // Trivial @Test testNamenodePing()88 public void testNamenodePing() throws IOException { 89 LOG.info("testing filesystem ping"); 90 NNClient namenode = dfsCluster.getNNClient(); 91 namenode.ping(); 92 LOG.info("done."); 93 } 94 95 // Trivial @Test testNamenodeConnectDisconnect()96 public void testNamenodeConnectDisconnect() throws IOException { 97 LOG.info("connecting to namenode"); 98 NNClient namenode = dfsCluster.getNNClient(); 99 namenode.connect(); 100 LOG.info("done."); 101 LOG.info("disconnecting from namenode"); 102 namenode.disconnect(); 103 } 104 105 /** 106 * The basic scenario for balancer test is as follows 107 * 108 * - Bring up cluster with 1 DataNode 109 * - Load DataNode to >50% 110 * - Count files/blocks on DataNode 111 * - Add new, empty DataNode to cluster 112 * - Run Balancer 113 * - Count files/blocks on DataNodes 114 * - Blocks counts from before and after Balancer run should be consistent 115 * 116 */ 117 @Test testBalancerBasicScenario()118 public void testBalancerBasicScenario() throws IOException { 119 Path balancerTempDir = null; 120 try { 121 List<DNClient> testnodes = reserveDatanodesForTest(2); 122 DNClient testnode1 = testnodes.get(0); 123 DNClient testnode2 = testnodes.get(1); 124 shutdownNonTestNodes(testnodes); 125 126 LOG.info("attempting to kill both test nodes"); 127 stopDatanode(testnode1); 128 stopDatanode(testnode2); 129 130 LOG.info("starting up datanode ["+ 131 testnode1.getHostName()+ 132 "] and loading it with data"); 133 startDatanode(testnode1); 134 135 // mkdir balancer-temp 136 balancerTempDir = makeTempDir(); 137 // write 2 blocks to file system 138 LOG.info("generating filesystem load"); 139 // TODO spec blocks to generate by blockCount, blockSize, # of writers 140 generateFileSystemLoad(2); // generate 2 blocks of test data 141 142 LOG.info("measure space used on 1st node"); 143 long usedSpace0 = getDatanodeUsedSpace(testnode1); 144 LOG.info("datanode " + testnode1.getHostName() 145 + " contains " + usedSpace0 + " bytes"); 146 147 LOG.info("bring up a 2nd node and run balancer on DFS"); 148 startDatanode(testnode2); 149 runBalancerAndVerify(testnodes); 150 } catch (Throwable t) { 151 LOG.info("method testBalancer failed", t); 152 } finally { 153 // finally block to run cleanup 154 LOG.info("clean off test data from DFS [rmr ~/balancer-temp]"); 155 try { 156 deleteTempDir(balancerTempDir); 157 } catch (Exception e) { 158 LOG.warn("problem cleaning up temp dir", e); 159 } 160 161 // restart killed nodes 162 Iterator<DNClient> iter = dfsCluster.getDNClients().iterator(); 163 164 while (iter.hasNext()) { 165 DNClient dn = iter.next(); 166 startDatanode( dn ); 167 } 168 } 169 } 170 shutdownNonTestNodes(List<DNClient> testnodes)171 private void shutdownNonTestNodes(List<DNClient> testnodes) { 172 Set killSet = new HashSet(getAllDatanodes()); 173 killSet.removeAll(testnodes); 174 LOG.info("attempting to kill/suspend all the nodes not used for this test"); 175 Iterator<DNClient> iter = killSet.iterator(); 176 DNClient dn = null; 177 while (iter.hasNext()) { 178 dn = iter.next(); 179 // kill may not work with some secure-HDFS configs, 180 // so using our stopDataNode() method 181 stopDatanode(dn); 182 } 183 } 184 185 /** 186 * Kill all datanodes but leave reservationCount nodes alive, 187 * return a list of the reserved datanodes 188 */ reserveDatanodesForTest(int reservationCount)189 private List<DNClient> reserveDatanodesForTest(int reservationCount) { 190 List<DNClient> testDNs = new LinkedList<DNClient>(); 191 List<DNClient> dieDNs = new LinkedList<DNClient>(); 192 LOG.info("getting collection of live data nodes"); 193 List<DNClient> dnList = getAllDatanodes(); 194 int dnCount = dnList.size(); 195 // check to make sure there is enough capacity on these nodes to run test 196 Assert.assertTrue( 197 String.format( 198 "not enough datanodes available to run test," 199 + " need %d datanodes but have only %d available", 200 reservationCount, dnCount), 201 ( dnCount >= reservationCount )); 202 LOG.info("selecting "+reservationCount+" nodes for test"); 203 dieDNs = new LinkedList<DNClient>(dnList); 204 testDNs = new LinkedList<DNClient>(); 205 206 final int LEN = dnCount - 1; 207 int i = getRandom(LEN); 208 DNClient testDN = dieDNs.get(i); 209 testDNs.add(testDN); 210 dieDNs.remove(testDN); 211 int j = i; 212 do { 213 i = getRandom(LEN); 214 } while (i != j); 215 testDN = dieDNs.get(i); 216 testDNs.add(testDN); 217 dieDNs.remove(testDN); 218 219 LOG.info("nodes reserved for test"); 220 printDatanodeList(testDNs); 221 222 LOG.info("nodes not used in test"); 223 printDatanodeList(dieDNs); 224 225 return testDNs; 226 } 227 getAllDatanodes()228 private List<DNClient> getAllDatanodes() { 229 return dfsCluster.getDNClients(); 230 } 231 232 private final static DNClient[] DATANODE_ARRAY = {}; toDatanodeArray(List<DNClient> datanodeList)233 private DNClient[] toDatanodeArray(List<DNClient> datanodeList) { 234 return (DNClient[]) datanodeList.toArray(DATANODE_ARRAY); 235 } 236 237 /** 238 * Return a random number between 0 and N inclusive. 239 * 240 * @param int n 241 * @param n max number to return 242 * @return random integer between 0 and N 243 */ getRandom(int n)244 private int getRandom(int n) { 245 return (int) (n * Math.random()); 246 } 247 248 /** 249 * Calculate if the error in expected and observed values is within tolerance 250 * 251 * @param expectedValue expected value of experiment 252 * @param observedValue observed value of experiment 253 * @param tolerance per cent tolerance for error, represented as a int 254 */ withinTolerance(long expectedValue, long observedValue, int tolerance)255 private boolean withinTolerance(long expectedValue, 256 long observedValue, 257 int tolerance) { 258 double diff = 1.0 * Math.abs(observedValue - expectedValue); 259 double thrs = expectedValue * (tolerance/100); 260 return diff > thrs; 261 } 262 263 // emulate tolerance calculation in balancer code 264 public final static int DEFAULT_TOLERANCE = 10; // 10% isClusterBalanced(DNClient[] datanodes)265 protected boolean isClusterBalanced(DNClient[] datanodes) throws IOException { 266 return isClusterBalanced(datanodes, DEFAULT_TOLERANCE); 267 } isClusterBalanced(DNClient[] datanodes, int tolerance)268 protected boolean isClusterBalanced(DNClient[] datanodes, int tolerance) 269 throws IOException { 270 271 Assert.assertFalse("empty datanode array specified", 272 ArrayUtils.isEmpty(datanodes)); 273 boolean result = true; 274 double[] utilizationByNode = new double[ datanodes.length ]; 275 double totalUsedSpace = 0L; 276 double totalCapacity = 0L; 277 Map datanodeVolumeMap = new HashMap(); 278 // accumulate space stored on each node 279 for(int i=0; i<datanodes.length; i++) { 280 DNClient datanode = datanodes[i]; 281 Map volumeInfoMap = getDatanodeVolumeAttributes(datanode); 282 long usedSpace = (Long)volumeInfoMap.get(ATTRNAME_USED_SPACE); 283 long capacity = (Long)volumeInfoMap.get(ATTRNAME_CAPACITY ); 284 utilizationByNode[i] = ( ((double)usedSpace)/capacity ) * 100; 285 totalUsedSpace += usedSpace; 286 totalCapacity += capacity; 287 } 288 // here we are reusing previously fetched volume-info, for speed 289 // an alternative is to get fresh values from the cluster here instead 290 double avgUtilization = ( totalUsedSpace/totalCapacity ) * 100; 291 for(int i=0; i<datanodes.length; i++) { 292 double varUtilization = Math.abs(avgUtilization - utilizationByNode[i]); 293 if(varUtilization > tolerance) { 294 result = false; 295 break; 296 } 297 } 298 299 return result; 300 } 301 302 /** 303 * Make a working directory for storing temporary files 304 * 305 * @throws IOException 306 */ makeTempDir()307 private Path makeTempDir() throws IOException { 308 Path temp = new Path(BALANCER_TEMP_DIR); 309 FileSystem srcFs = temp.getFileSystem(hadoopConf); 310 FileStatus fstatus = null; 311 try { 312 fstatus = srcFs.getFileStatus(temp); 313 if (fstatus.isDir()) { 314 LOG.warn(BALANCER_TEMP_DIR + ": File exists"); 315 } else { 316 LOG.warn(BALANCER_TEMP_DIR + " exists but is not a directory"); 317 } 318 deleteTempDir(temp); 319 } catch (FileNotFoundException fileNotFoundExc) { 320 } finally { 321 if (!srcFs.mkdirs(temp)) { 322 throw new IOException("failed to create " + BALANCER_TEMP_DIR); 323 } 324 } 325 return temp; 326 } 327 328 /** 329 * Remove the working directory used to store temporary files 330 * 331 * @param temp 332 * @throws IOException 333 */ deleteTempDir(Path temp)334 private void deleteTempDir(Path temp) throws IOException { 335 FileSystem srcFs = temp.getFileSystem(hadoopConf); 336 LOG.info("attempting to delete path " + temp + "; this path exists? -> " + srcFs.exists(temp)); 337 srcFs.delete(temp, true); 338 } 339 printDatanodeList(List<DNClient> lis)340 private void printDatanodeList(List<DNClient> lis) { 341 for (DNClient datanode : lis) { 342 LOG.info("\t" + datanode.getHostName()); 343 } 344 } 345 346 private final static String CMD_STOP_DN = "sudo yinst stop hadoop_datanode_admin"; stopDatanode(DNClient dn)347 private void stopDatanode(DNClient dn) { 348 String dnHost = dn.getHostName(); 349 runAndWatch(dnHost, CMD_STOP_DN); 350 } 351 private final static String CMD_START_DN = "sudo yinst start hadoop_datanode_admin"; startDatanode(DNClient dn)352 private void startDatanode(DNClient dn) { 353 String dnHost = dn.getHostName(); 354 runAndWatch(dnHost, CMD_START_DN); 355 } 356 357 /* using "old" default block size of 64M */ 358 private static final int DFS_BLOCK_SIZE = 67108864; 359 private static final short DEFAULT_REPLICATION = 3; generateFileSystemLoad(long numBlocks)360 private void generateFileSystemLoad(long numBlocks) { 361 generateFileSystemLoad(numBlocks, DEFAULT_REPLICATION); 362 } generateFileSystemLoad(long numBlocks, short replication)363 private void generateFileSystemLoad(long numBlocks, short replication) { 364 String destfile = "hdfs:///user/hadoopqa/";// + BALANCER_TEMP_DIR + "/LOADGEN.DAT"; 365 SecureRandom randgen = new SecureRandom(); 366 ByteArrayOutputStream dat = null; 367 ByteArrayInputStream in = null; 368 final int CHUNK = 4096; 369 final Configuration testConf = new Configuration(hadoopConf); 370 try { 371 testConf.setInt("dfs.replication", replication); 372 for (int i = 0; i < numBlocks; i++) { 373 FileSystem fs = FileSystem.get( 374 URI.create(destfile), testConf); 375 OutputStream out = fs.create( 376 new Path(destfile), 377 replication, 378 new ProgressReporter()); 379 dat = new ByteArrayOutputStream(DFS_BLOCK_SIZE); 380 for (int z = 0; z < DFS_BLOCK_SIZE; z += CHUNK) { 381 byte[] bytes = new byte[CHUNK]; 382 randgen.nextBytes(bytes); 383 dat.write(bytes, 0, CHUNK); 384 } 385 386 in = new ByteArrayInputStream(dat.toByteArray()); 387 IOUtils.copyBytes(in, out, CHUNK, true); 388 LOG.info("wrote block " + (i + 1) + " of " + numBlocks); 389 } 390 } catch (IOException ioExc) { 391 LOG.warn("f/s loadgen failed!", ioExc); 392 } finally { 393 try { 394 dat.close(); 395 } catch (Exception e) { 396 } 397 try { 398 in.close(); 399 } catch (Exception e) { 400 } 401 } 402 } 403 // TODO this should be taken from the environment 404 public final static String HADOOP_HOME = "/grid/0/gs/gridre/yroot.biga/share/hadoop-current"; 405 public final static String CMD_SSH = "/usr/bin/ssh"; 406 public final static String CMD_KINIT = "/usr/kerberos/bin/kinit"; 407 public final static String CMD_HADOOP = HADOOP_HOME + "/bin/hadoop"; 408 public final static String OPT_BALANCER = "balancer"; 409 public final static String KERB_KEYTAB = "/homes/hadoopqa/hadoopqa.dev.headless.keytab"; 410 public final static String KERB_PRINCIPAL = "hadoopqa@DEV.YGRID.YAHOO.COM"; 411 412 public final static int DEFAULT_THRESHOLD = 10; runBalancer()413 private int runBalancer() throws IOException { 414 return runBalancer(DEFAULT_THRESHOLD); 415 } 416 runBalancer(int threshold)417 private int runBalancer(int threshold) throws IOException { 418 return runBalancer(""+threshold); 419 } 420 /* 421 * TODO change the heap size balancer uses so it can run on gateways 422 * i.e., 14G heap is too big for gateways 423 */ runBalancer(String threshold)424 private int runBalancer(String threshold) 425 throws IOException { 426 427 String balancerCommand = String.format("\"%s -k -t %s %s; %s %s -threshold %s", 428 CMD_KINIT, 429 KERB_KEYTAB, 430 KERB_PRINCIPAL, 431 CMD_HADOOP, 432 OPT_BALANCER, 433 threshold); 434 String nnHost = dfsCluster.getNNClient().getHostName(); 435 return runAndWatch(nnHost, balancerCommand); 436 } runBalancerAndVerify(List<DNClient> testnodes)437 private void runBalancerAndVerify(List<DNClient> testnodes) 438 throws IOException { 439 runBalancerAndVerify(testnodes, DEFAULT_THRESHOLD); 440 } runBalancerAndVerify(List<DNClient> testnodes, int threshold)441 private void runBalancerAndVerify(List<DNClient> testnodes, int threshold) 442 throws IOException { 443 runBalancerAndVerify(testnodes, ""+DEFAULT_THRESHOLD); 444 } runBalancerAndVerify(List<DNClient> testnodes, String threshold)445 private void runBalancerAndVerify(List<DNClient> testnodes, String threshold) 446 throws IOException { 447 int exitStatus = runBalancer(threshold); 448 // assert balancer exits with status SUCCESSe 449 Assert.assertTrue( 450 String.format("balancer returned non-success exit code: %d", 451 exitStatus), 452 (exitStatus == SUCCESS)); 453 DNClient[] testnodeArr = toDatanodeArray(testnodes); 454 Assert.assertTrue( 455 "cluster is not balanced", 456 isClusterBalanced(testnodeArr)); 457 } 458 runAndWatch(String remoteHost, String remoteCommand)459 private int runAndWatch(String remoteHost, String remoteCommand) { 460 int exitStatus = -1; 461 try { 462 Process proc = new ProcessBuilder(CMD_SSH, remoteHost, remoteCommand).start(); 463 watchProcStream(proc.getInputStream(), System.out); 464 watchProcStream(proc.getErrorStream(), System.err); 465 exitStatus = proc.waitFor(); 466 } catch(InterruptedException intExc) { 467 LOG.warn("got thread interrupt error", intExc); 468 } catch(IOException ioExc) { 469 LOG.warn("got i/o error", ioExc); 470 } 471 return exitStatus; 472 } 473 watchProcStream(InputStream in, PrintStream out)474 private void watchProcStream(InputStream in, PrintStream out) { 475 new Thread(new StreamWatcher(in, out)).start(); 476 } 477 private static final String DATANODE_VOLUME_INFO = "VolumeInfo"; 478 private static final String ATTRNAME_USED_SPACE = "usedSpace"; 479 private static final String ATTRNAME_FREE_SPACE = "freeSpace"; 480 // pseudo attribute, JMX doesn't really provide this 481 private static final String ATTRNAME_CAPACITY = "capacity"; 482 // TODO maybe the static methods below belong in some utility class... getDatanodeUsedSpace(DNClient datanode)483 private static long getDatanodeUsedSpace(DNClient datanode) 484 throws IOException { 485 return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_USED_SPACE); 486 }/* 487 private static long getDatanodeFreeSpace(DNClient datanode) 488 throws IOException { 489 return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_FREE_SPACE); 490 }*/ getDatanodeVolumeAttributes(DNClient datanode)491 private static Map getDatanodeVolumeAttributes(DNClient datanode) 492 throws IOException { 493 Map result = new HashMap(); 494 long usedSpace = getVolumeAttribute(datanode, ATTRNAME_USED_SPACE); 495 long freeSpace = getVolumeAttribute(datanode, ATTRNAME_FREE_SPACE); 496 result.put(ATTRNAME_USED_SPACE, usedSpace); 497 result.put(ATTRNAME_CAPACITY, usedSpace+freeSpace); 498 return result; 499 } 500 getVolumeAttribute(DNClient datanode, String attribName)501 private static long getVolumeAttribute(DNClient datanode, 502 String attribName) 503 throws IOException { 504 505 Object volInfo = datanode.getDaemonAttribute(DATANODE_VOLUME_INFO); 506 Assert 507 .assertNotNull( String 508 .format( "Attribute \"%s\" should be non-null", 509 DATANODE_VOLUME_INFO ), 510 volInfo ); 511 String strVolInfo = volInfo.toString(); 512 LOG.debug( String.format("Value of %s: %s", 513 DATANODE_VOLUME_INFO, 514 strVolInfo) ); 515 Map volInfoMap = (Map) JSON.parse(strVolInfo); 516 long attrVal = 0L; 517 for(Object key: volInfoMap.keySet()) { 518 Map attrMap = (Map) volInfoMap.get(key); 519 long val = (Long) attrMap.get(attribName); 520 attrVal += val; 521 } 522 return attrVal; 523 524 } 525 /** simple utility to watch streams from an exec'ed process */ 526 static class StreamWatcher implements Runnable { 527 528 private BufferedReader reader; 529 private PrintStream printer; 530 StreamWatcher(InputStream in, PrintStream out)531 StreamWatcher(InputStream in, PrintStream out) { 532 reader = getReader(in); 533 printer = out; 534 } 535 getReader(InputStream in)536 private static BufferedReader getReader(InputStream in) { 537 return new BufferedReader(new InputStreamReader(in)); 538 } 539 run()540 public void run() { 541 try { 542 if (reader.ready()) { 543 printer.println(reader.readLine()); 544 } 545 } catch (IOException ioExc) { 546 } 547 } 548 } 549 550 /** simple utility to report progress in generating data */ 551 static class ProgressReporter implements Progressable { 552 553 StringBuffer buf = null; 554 progress()555 public void progress() { 556 if (buf == null) { 557 buf = new StringBuffer(); 558 } 559 buf.append("."); 560 if (buf.length() == 10000) { 561 LOG.info(".........."); 562 buf = null; 563 } 564 } 565 } 566 567 // A constant for SUCCESS exit code 568 static final int SUCCESS = 1; 569 570 /** 571 * Balancer_01 572 * Start balancer and check if the cluster is balanced after the run. 573 * Cluster should end up in balanced state. 574 */ 575 @Test testBalancerSimple()576 public void testBalancerSimple() throws IOException { 577 578 DNClient[] datanodes = toDatanodeArray( getAllDatanodes() ); 579 int exitStatus = runBalancer(); 580 // assert on successful exit code here 581 Assert.assertTrue( 582 String.format("balancer returned non-success exit code: %d", 583 exitStatus), 584 (exitStatus == SUCCESS)); 585 Assert.assertTrue( "cluster is not balanced", isClusterBalanced(datanodes) ); 586 587 } 588 589 /** 590 * Balancer_02 591 * Test a cluster with even distribution, then a new empty node is 592 * added to the cluster. Here, even distribution effectively means the 593 * cluster is in "balanced" state, as bytes consumed for block allocation 594 * are evenly distributed throughout the cluster. 595 */ 596 @Test testBalancerEvenDistributionWithNewNodeAdded()597 public void testBalancerEvenDistributionWithNewNodeAdded() throws IOException { 598 throw new UnsupportedOperationException("not implemented yet!"); 599 600 // get all nodes 601 // need to get an external reserve of nodes we can boot up 602 // to add to this cluster? 603 // HOW? 604 605 // IDEA try to steal some nodes from omega-M for now..... 606 // hmmm also need a way to give an alternate "empty-node" config 607 // to "hide" the data that may already exist on this node 608 } 609 610 /** 611 * Balancer_03 612 * Bring up a 1-node DFS cluster. Set files replication factor to be 1 613 * and fill up the node to 30% full. Then add an empty datanode. 614 */ 615 @Test testBalancerSingleNodeClusterWithNewNodeAdded()616 public void testBalancerSingleNodeClusterWithNewNodeAdded() throws IOException { 617 // empty datanode: mod config to point to non-default blocks dir. 618 // limit capacity to available storage space 619 throw new UnsupportedOperationException("not implemented yet!"); 620 } 621 622 /** 623 * Balancer_04 624 * The same as _03 except that the empty new data node is on a 625 * different rack. 626 */ 627 @Test testBalancerSingleNodeClusterWithNewNodeAddedFromDifferentRack()628 public void testBalancerSingleNodeClusterWithNewNodeAddedFromDifferentRack() 629 throws IOException { 630 // need rack awareness 631 throw new UnsupportedOperationException("not implemented yet!"); 632 } 633 634 /** 635 * Balancer_05 636 * The same as _03 except that the empty new data node is half the 637 * capacity as the old one. 638 */ 639 @Test testBalancerSingleNodeClusterWithHalfCapacityNewNode()640 public void testBalancerSingleNodeClusterWithHalfCapacityNewNode() { 641 // how to limit node capacity? 642 throw new UnsupportedOperationException("not implemented yet!"); 643 } 644 645 /** 646 * Balancer_06 647 * Bring up a 2-node cluster and fill one node to be 60% and the 648 * other to be 10% full. All nodes are on different racks. 649 */ 650 @Test testBalancerTwoNodeMultiRackCluster()651 public void testBalancerTwoNodeMultiRackCluster() { 652 // need rack awareness 653 throw new UnsupportedOperationException("not implemented yet!"); 654 } 655 656 /** 657 * Balancer_07 658 * Bring up a dfs cluster with nodes A and B. Set file replication 659 * factor to be 2 and fill up the cluster to 30% full. Then add an 660 * empty data node C. All three nodes are on the same rack. 661 */ 662 @Test testBalancerTwoNodeSingleRackClusterWuthNewNodeAdded()663 public void testBalancerTwoNodeSingleRackClusterWuthNewNodeAdded() 664 throws IOException { 665 666 final short TEST_REPLICATION_FACTOR = 3; 667 List<DNClient> testnodes = reserveDatanodesForTest(3); 668 DNClient dnA = testnodes.get(0); 669 DNClient dnB = testnodes.get(1); 670 671 DNClient dnC = testnodes.get(2); 672 stopDatanode(dnC); 673 674 // change test: 30% full-er (ie, 30% over pre-test capacity), 675 // use most heavily node as baseline 676 long targetLoad = (long) ( 677 (1/DFS_BLOCK_SIZE) * 678 0.30 * 679 Math.max( getDatanodeUsedSpace(dnA), getDatanodeUsedSpace(dnB) ) ); 680 generateFileSystemLoad(targetLoad, TEST_REPLICATION_FACTOR); 681 startDatanode(dnC); 682 runBalancerAndVerify(testnodes); 683 } 684 685 /** 686 * Balancer_08 687 * The same as _07 except that A, B and C are on different racks. 688 */ 689 @Test testBalancerTwoNodeMultiRackClusterWithNewNodeAdded()690 public void testBalancerTwoNodeMultiRackClusterWithNewNodeAdded() 691 throws IOException { 692 // need rack awareness 693 throw new UnsupportedOperationException("not implemented yet!"); 694 } 695 696 /** 697 * Balancer_09 698 * The same as _07 except that interrupt balancing. 699 */ 700 @Test testBalancerTwoNodeSingleRackClusterInterruptingRebalance()701 public void testBalancerTwoNodeSingleRackClusterInterruptingRebalance() 702 throws IOException { 703 // interrupt thread 704 throw new UnsupportedOperationException("not implemented yet!"); 705 } 706 707 /** 708 * Balancer_10 709 * Restart rebalancing until it is done. 710 */ 711 @Test testBalancerRestartInterruptedBalancerUntilDone()712 public void testBalancerRestartInterruptedBalancerUntilDone() 713 throws IOException { 714 // need kill-restart thread 715 throw new UnsupportedOperationException("not implemented yet!"); 716 } 717 718 /** 719 * Balancer_11 720 * The same as _07 except that the namenode is shutdown while rebalancing. 721 */ 722 @Test testBalancerTwoNodeSingleRackShutdownNameNodeDuringRebalance()723 public void testBalancerTwoNodeSingleRackShutdownNameNodeDuringRebalance() 724 throws IOException { 725 // need NN shutdown thread in addition 726 throw new UnsupportedOperationException("not implemented yet!"); 727 } 728 729 /** 730 * Balancer_12 731 * The same as _05 except that FS writes occur during rebalancing. 732 */ 733 @Test 734 public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSWrites()735 testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSWrites() 736 throws IOException { 737 // writer thread 738 throw new UnsupportedOperationException("not implemented yet!"); 739 } 740 741 /** 742 * Balancer_13 743 * The same as _05 except that FS deletes occur during rebalancing. 744 */ 745 @Test testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletes()746 public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletes() 747 throws IOException { 748 // eraser thread 749 throw new UnsupportedOperationException("not implemented yet!"); 750 } 751 752 /** 753 * Balancer_14 754 * The same as _05 except that FS deletes AND writes occur during 755 * rebalancing. 756 */ 757 @Test testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletesAndWrites()758 public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletesAndWrites() 759 throws IOException { 760 // writer & eraser threads 761 throw new UnsupportedOperationException("not implemented yet!"); 762 } 763 764 /** 765 * Balancer_15 766 * Scalability test: Populate a 750-node cluster, then 767 * 1. Run rebalancing after 3 nodes are added 768 * 2. Run rebalancing after 2 racks of nodes (60 nodes) are added 769 * 3. Run rebalancing after 2 racks of nodes are added and concurrently 770 * executing file writing and deleting at the same time 771 */ 772 @Test testBalancerScalability()773 public void testBalancerScalability() throws IOException { 774 /* work in progress-> 775 * 776 * 777 List<DNClient> dnList = getAllDatanodes(); 778 int dnCount = dnList.size(); 779 780 Assert.assertTrue( 781 String.format( 782 "not enough datanodes available to run test," 783 + " need 2 datanodes but have only %d available", 784 dnCount), 785 ( dnCount == (875 - 2) )); 786 787 List<DNClient> datanodes = reserveDatanodesForTest(750); 788 shutdownNonTestNodes(datanodes); 789 */ 790 throw new UnsupportedOperationException("not implemented yet!"); 791 } 792 793 /** 794 * Balancer_16 795 * Start balancer with a negative threshold value. 796 */ 797 @Test testBalancerConfiguredWithThresholdValueNegative()798 public void testBalancerConfiguredWithThresholdValueNegative() 799 throws IOException { 800 List<DNClient> testnodes = getAllDatanodes(); 801 final int TRIALS=5; 802 for(int i=0; i<TRIALS; i++) { 803 int negThreshold = (int)(-1 * 100 * Math.random()); 804 runBalancerAndVerify(testnodes, negThreshold); 805 } 806 } 807 808 /** 809 * Balancer_17 810 * Start balancer with out-of-range threshold value 811 * (e.g. -123, 0, -324, 100000, -12222222, 1000000000, -10000, 345, 989) 812 */ 813 @Test testBalancerConfiguredWithThresholdValueOutOfRange()814 public void testBalancerConfiguredWithThresholdValueOutOfRange() 815 throws IOException { 816 List<DNClient> testnodes = getAllDatanodes(); 817 final int[] THRESHOLD_OUT_OF_RANGE_DATA = { 818 -123, 0, -324, 100000, -12222222, 1000000000, -10000, 345, 989 819 }; 820 for(int threshold: THRESHOLD_OUT_OF_RANGE_DATA) { 821 runBalancerAndVerify(testnodes, threshold); 822 } 823 } 824 825 /** 826 * Balancer_18 827 * Start balancer with alpha-numeric threshold value 828 * (e.g., 103dsf, asd234, asfd, ASD, #$asd, 2345&, $35, %34) 829 */ 830 @Test testBalancerConfiguredWithThresholdValueAlphanumeric()831 public void testBalancerConfiguredWithThresholdValueAlphanumeric() 832 throws IOException { 833 List<DNClient> testnodes = getAllDatanodes(); 834 final String[] THRESHOLD_ALPHA_DATA = { 835 "103dsf", "asd234", "asfd", "ASD", "#$asd", "2345&", "$35", "%34", 836 "0x64", "0xde", "0xad", "0xbe", "0xef" 837 }; 838 for(String threshold: THRESHOLD_ALPHA_DATA) { 839 runBalancerAndVerify(testnodes,threshold); 840 } 841 } 842 843 /** 844 * Balancer_19 845 * Start 2 instances of balancer on the same gateway 846 */ 847 @Test testBalancerRunTwoConcurrentInstancesOnSingleGateway()848 public void testBalancerRunTwoConcurrentInstancesOnSingleGateway() 849 throws IOException { 850 // do on gateway logic with small balancer heap 851 throw new UnsupportedOperationException("not implemented yet!"); 852 } 853 854 /** 855 * Balancer_20 856 * Start 2 instances of balancer on two different gateways 857 */ 858 @Test testBalancerRunTwoConcurrentInstancesOnDistinctGateways()859 public void testBalancerRunTwoConcurrentInstancesOnDistinctGateways() 860 throws IOException { 861 // do on gateway logic with small balancer heap 862 throw new UnsupportedOperationException("not implemented yet!"); 863 } 864 865 /** 866 * Balancer_21 867 * Start balancer when the cluster is already balanced 868 */ 869 @Test testBalancerOnBalancedCluster()870 public void testBalancerOnBalancedCluster() throws IOException { 871 // run balancer twice 872 testBalancerSimple(); 873 testBalancerSimple(); 874 } 875 876 /** 877 * Balancer_22 878 * Running the balancer with half the data nodes not running 879 */ 880 @Test testBalancerWithOnlyHalfOfDataNodesRunning()881 public void testBalancerWithOnlyHalfOfDataNodesRunning() 882 throws IOException { 883 List<DNClient> datanodes = getAllDatanodes(); 884 int testnodeCount = (int)Math.floor(datanodes.size() * 0.5); 885 List<DNClient> testnodes = reserveDatanodesForTest(testnodeCount); 886 runBalancerAndVerify(testnodes); 887 } 888 889 /** 890 * Balancer_23 891 * Running the balancer and simultaneously simulating load on the 892 * cluster with half the data nodes not running. 893 */ 894 @Test testBalancerOnBusyClusterWithOnlyHalfOfDatanodesRunning()895 public void testBalancerOnBusyClusterWithOnlyHalfOfDatanodesRunning() 896 throws IOException { 897 // load thread 898 throw new UnsupportedOperationException("not implemented yet!"); 899 } 900 901 /** 902 * Protocol Test Prelude 903 * 904 * First set up 3 node cluster with nodes NA, NB and NC, which are on 905 * different racks. Then create a file with one block B with a replication 906 * factor 3. Finally add a new node ND to the cluster on the same rack as NC. 907 */ 908 909 /** 910 * ProtocolTest_01 911 * Copy block B from ND to NA with del hint NC 912 */ 913 @Test 914 public void testBlockReplacementProtocolFailWhenCopyBlockSourceDoesNotHaveBlockToCopy()915 testBlockReplacementProtocolFailWhenCopyBlockSourceDoesNotHaveBlockToCopy() 916 throws IOException { 917 throw new UnsupportedOperationException("not implemented yet!"); 918 } 919 920 /* 921 * ProtocolTest_02 922 * Copy block B from NA to NB with del hint NB 923 */ 924 @Test 925 public void testBlockReplacementProtocolFailWhenCopyBlockDestinationContainsBlockCopy()926 testBlockReplacementProtocolFailWhenCopyBlockDestinationContainsBlockCopy() 927 throws IOException { 928 throw new UnsupportedOperationException("not implemented yet!"); 929 } 930 931 /** 932 * ProtocolTest_03 933 * Copy block B from NA to ND with del hint NB 934 */ 935 @Test testBlockReplacementProtocolCopyBlock()936 public void testBlockReplacementProtocolCopyBlock() throws IOException { 937 throw new UnsupportedOperationException("not implemented yet!"); 938 } 939 940 /** 941 * ProtocolTest_04 942 * Copy block B from NB to NC with del hint NA 943 */ 944 @Test testBlockReplacementProtocolWithInvalidHint()945 public void testBlockReplacementProtocolWithInvalidHint() 946 throws IOException { 947 throw new UnsupportedOperationException("not implemented yet!"); 948 } 949 950 /** 951 * ThrottleTest_01 952 * Create a throttler with 1MB/s bandwidth. Send 6MB data, and throttle 953 * at 0.5MB, 0.75MB, and in the end [1MB/s?]. 954 */ 955 956 /** 957 * NamenodeProtocolTest_01 958 * Get blocks from datanode 0 with a size of 2 blocks. 959 */ 960 @Test testNamenodeProtocolGetBlocksCheckThroughput()961 public void testNamenodeProtocolGetBlocksCheckThroughput() 962 throws IOException { 963 throw new UnsupportedOperationException("not implemented yet!"); 964 } 965 966 /** 967 * NamenodeProtocolTest_02 968 * Get blocks from datanode 0 with a size of 1 block. 969 */ 970 @Test testNamenodeProtocolGetSingleBlock()971 public void testNamenodeProtocolGetSingleBlock() 972 throws IOException { 973 throw new UnsupportedOperationException("not implemented yet!"); 974 } 975 976 /** 977 * NamenodeProtocolTest_03 978 * Get blocks from datanode 0 with a size of 0. 979 */ 980 @Test testNamenodeProtocolGetZeroBlocks()981 public void testNamenodeProtocolGetZeroBlocks() throws IOException { 982 throw new UnsupportedOperationException("not implemented yet!"); 983 } 984 /** 985 * NamenodeProtocolTest_04 986 * Get blocks from datanode 0 with a size of -1. 987 */ 988 @Test testNamenodeProtocolGetMinusOneBlocks()989 public void testNamenodeProtocolGetMinusOneBlocks() throws Exception { 990 991 } 992 993 /** 994 * NamenodeProtocolTest_05 995 * Get blocks from a non-existent datanode. 996 */ 997 @Test testNamenodeProtocolGetBlocksFromNonexistentDatanode()998 public void testNamenodeProtocolGetBlocksFromNonexistentDatanode() 999 throws IOException { 1000 final short replication = 1; 1001 Path balancerTempDir = null; 1002 try { 1003 // reserve 2 nodes for test 1004 List<DNClient> testnodes = reserveDatanodesForTest(2); 1005 shutdownNonTestNodes(testnodes); 1006 1007 DNClient testnode1 = testnodes.get(0); 1008 DNClient testnode2 = testnodes.get(1); 1009 1010 // write some blocks with replication factor of 1 1011 balancerTempDir = makeTempDir(); 1012 generateFileSystemLoad(20, replication); 1013 1014 // get block locations from NN 1015 NNClient namenode = dfsCluster.getNNClient(); 1016 // TODO extend namenode to get block locations 1017 //namenode.get 1018 1019 // shutdown 1 node 1020 stopDatanode(testnode1); 1021 1022 // attempt to retrieve blocks from the dead node 1023 // we should fail 1024 } finally { 1025 // cleanup 1026 // finally block to run cleanup 1027 LOG.info("clean off test data from DFS [rmr ~/balancer-temp]"); 1028 try { 1029 deleteTempDir(balancerTempDir); 1030 } catch (Exception e) { 1031 LOG.warn("problem cleaning up temp dir", e); 1032 } 1033 } 1034 } 1035 } 1036 1037