1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.blockmanagement; 19 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertFalse; 22 import static org.junit.Assert.assertTrue; 23 24 import java.io.File; 25 import java.util.ArrayList; 26 import java.util.Arrays; 27 import java.util.HashMap; 28 import java.util.HashSet; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.Set; 32 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 35 import org.apache.hadoop.fs.FileSystem; 36 import org.apache.hadoop.fs.StorageType; 37 import org.apache.hadoop.hdfs.DFSConfigKeys; 38 import org.apache.hadoop.hdfs.DFSTestUtil; 39 import org.apache.hadoop.hdfs.HdfsConfiguration; 40 import org.apache.hadoop.hdfs.TestBlockStoragePolicy; 41 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 42 import org.apache.hadoop.hdfs.server.namenode.NameNode; 43 import org.apache.hadoop.net.NetworkTopology; 44 import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; 45 import org.apache.hadoop.net.Node; 46 import org.apache.hadoop.test.PathUtils; 47 import org.junit.After; 48 import org.junit.Before; 49 import org.junit.Test; 50 51 52 public class TestReplicationPolicyWithNodeGroup { 53 private static final int BLOCK_SIZE = 1024; 54 private static final int NUM_OF_DATANODES = 8; 55 private static final int NUM_OF_DATANODES_BOUNDARY = 6; 56 private static final int NUM_OF_DATANODES_MORE_TARGETS = 12; 57 private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6; 58 private final Configuration CONF = new HdfsConfiguration(); 59 private NetworkTopology cluster; 60 private NameNode namenode; 61 private BlockPlacementPolicy replicator; 62 private static final String filename = "/dummyfile.txt"; 63 64 private static final DatanodeStorageInfo[] storages; 65 private static final DatanodeDescriptor[] dataNodes; 66 static { 67 final String[] racks = { 68 "/d1/r1/n1", 69 "/d1/r1/n1", 70 "/d1/r1/n2", 71 "/d1/r2/n3", 72 "/d1/r2/n3", 73 "/d1/r2/n4", 74 "/d2/r3/n5", 75 "/d2/r3/n6" 76 }; 77 storages = DFSTestUtil.createDatanodeStorageInfos(racks); 78 dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); 79 } 80 81 private static final DatanodeStorageInfo[] storagesInBoundaryCase; 82 private static final DatanodeDescriptor[] dataNodesInBoundaryCase; 83 static { 84 final String[] racksInBoundaryCase = { 85 "/d1/r1/n1", 86 "/d1/r1/n1", 87 "/d1/r1/n1", 88 "/d1/r1/n2", 89 "/d1/r2/n3", 90 "/d1/r2/n3" 91 }; 92 storagesInBoundaryCase = DFSTestUtil.createDatanodeStorageInfos(racksInBoundaryCase); 93 dataNodesInBoundaryCase = DFSTestUtil.toDatanodeDescriptor(storagesInBoundaryCase); 94 } 95 96 private static final DatanodeStorageInfo[] storagesInMoreTargetsCase; 97 private final static DatanodeDescriptor[] dataNodesInMoreTargetsCase; 98 static { 99 final String[] racksInMoreTargetsCase = { 100 "/r1/n1", 101 "/r1/n1", 102 "/r1/n2", 103 "/r1/n2", 104 "/r1/n3", 105 "/r1/n3", 106 "/r2/n4", 107 "/r2/n4", 108 "/r2/n5", 109 "/r2/n5", 110 "/r2/n6", 111 "/r2/n6" 112 }; 113 storagesInMoreTargetsCase = DFSTestUtil.createDatanodeStorageInfos(racksInMoreTargetsCase); 114 dataNodesInMoreTargetsCase = DFSTestUtil.toDatanodeDescriptor(storagesInMoreTargetsCase); 115 }; 116 117 private final static DatanodeDescriptor NODE = 118 new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7")); 119 120 private static final DatanodeStorageInfo[] storagesForDependencies; 121 private static final DatanodeDescriptor[] dataNodesForDependencies; 122 static { 123 final String[] racksForDependencies = { 124 "/d1/r1/n1", 125 "/d1/r1/n1", 126 "/d1/r1/n2", 127 "/d1/r1/n2", 128 "/d1/r1/n3", 129 "/d1/r1/n4" 130 }; 131 final String[] hostNamesForDependencies = { 132 "h1", 133 "h2", 134 "h3", 135 "h4", 136 "h5", 137 "h6" 138 }; 139 140 storagesForDependencies = DFSTestUtil.createDatanodeStorageInfos( 141 racksForDependencies, hostNamesForDependencies); 142 dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies); 143 144 }; 145 146 @Before setUp()147 public void setUp() throws Exception { 148 FileSystem.setDefaultUri(CONF, "hdfs://localhost:0"); 149 CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); 150 // Set properties to make HDFS aware of NodeGroup. 151 CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, 152 BlockPlacementPolicyWithNodeGroup.class.getName()); 153 CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 154 NetworkTopologyWithNodeGroup.class.getName()); 155 156 CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); 157 158 File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class); 159 160 CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, 161 new File(baseDir, "name").getPath()); 162 163 DFSTestUtil.formatNameNode(CONF); 164 namenode = new NameNode(CONF); 165 final BlockManager bm = namenode.getNamesystem().getBlockManager(); 166 replicator = bm.getBlockPlacementPolicy(); 167 cluster = bm.getDatanodeManager().getNetworkTopology(); 168 // construct network topology 169 for(int i=0; i<NUM_OF_DATANODES; i++) { 170 cluster.add(dataNodes[i]); 171 } 172 setupDataNodeCapacity(); 173 } 174 175 @After tearDown()176 public void tearDown() throws Exception { 177 namenode.stop(); 178 } 179 updateHeartbeatWithUsage(DatanodeDescriptor dn, long capacity, long dfsUsed, long remaining, long blockPoolUsed, long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures)180 private static void updateHeartbeatWithUsage(DatanodeDescriptor dn, 181 long capacity, long dfsUsed, long remaining, long blockPoolUsed, 182 long dnCacheCapacity, long dnCacheUsed, int xceiverCount, 183 int volFailures) { 184 dn.getStorageInfos()[0].setUtilizationForTesting( 185 capacity, dfsUsed, remaining, blockPoolUsed); 186 dn.updateHeartbeat( 187 BlockManagerTestUtil.getStorageReportsForDatanode(dn), 188 dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); 189 } 190 setupDataNodeCapacity()191 private static void setupDataNodeCapacity() { 192 for(int i=0; i<NUM_OF_DATANODES; i++) { 193 updateHeartbeatWithUsage(dataNodes[i], 194 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 195 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); 196 } 197 } 198 199 /** 200 * Scan the targets list: all targets should be on different NodeGroups. 201 * Return false if two targets are found on the same NodeGroup. 202 */ checkTargetsOnDifferentNodeGroup( DatanodeStorageInfo[] targets)203 private static boolean checkTargetsOnDifferentNodeGroup( 204 DatanodeStorageInfo[] targets) { 205 if(targets.length == 0) 206 return true; 207 Set<String> targetSet = new HashSet<String>(); 208 for(DatanodeStorageInfo storage:targets) { 209 final DatanodeDescriptor node = storage.getDatanodeDescriptor(); 210 String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation()); 211 if(targetSet.contains(nodeGroup)) { 212 return false; 213 } else { 214 targetSet.add(nodeGroup); 215 } 216 } 217 return true; 218 } 219 isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right)220 private boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) { 221 return isOnSameRack(left.getDatanodeDescriptor(), right); 222 } 223 isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right)224 private boolean isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right) { 225 return cluster.isOnSameRack(left, right.getDatanodeDescriptor()); 226 } 227 isOnSameNodeGroup(DatanodeStorageInfo left, DatanodeStorageInfo right)228 private boolean isOnSameNodeGroup(DatanodeStorageInfo left, DatanodeStorageInfo right) { 229 return isOnSameNodeGroup(left.getDatanodeDescriptor(), right); 230 } 231 isOnSameNodeGroup(DatanodeDescriptor left, DatanodeStorageInfo right)232 private boolean isOnSameNodeGroup(DatanodeDescriptor left, DatanodeStorageInfo right) { 233 return cluster.isOnSameNodeGroup(left, right.getDatanodeDescriptor()); 234 } 235 chooseTarget(int numOfReplicas)236 private DatanodeStorageInfo[] chooseTarget(int numOfReplicas) { 237 return chooseTarget(numOfReplicas, dataNodes[0]); 238 } 239 chooseTarget(int numOfReplicas, DatanodeDescriptor writer)240 private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, 241 DatanodeDescriptor writer) { 242 return chooseTarget(numOfReplicas, writer, 243 new ArrayList<DatanodeStorageInfo>()); 244 } 245 chooseTarget(int numOfReplicas, List<DatanodeStorageInfo> chosenNodes)246 private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, 247 List<DatanodeStorageInfo> chosenNodes) { 248 return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes); 249 } 250 chooseTarget(int numOfReplicas, DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes)251 private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, 252 DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) { 253 return chooseTarget(numOfReplicas, writer, chosenNodes, null); 254 } 255 chooseTarget( int numOfReplicas, DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes)256 private DatanodeStorageInfo[] chooseTarget( 257 int numOfReplicas, 258 DatanodeDescriptor writer, 259 List<DatanodeStorageInfo> chosenNodes, 260 Set<Node> excludedNodes) { 261 return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, 262 false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); 263 } 264 265 /** 266 * In this testcase, client is dataNodes[0]. So the 1st replica should be 267 * placed on dataNodes[0], the 2nd replica should be placed on 268 * different rack and third should be placed on different node (and node group) 269 * of rack chosen for 2nd node. 270 * The only excpetion is when the <i>numOfReplicas</i> is 2, 271 * the 1st is on dataNodes[0] and the 2nd is on a different rack. 272 * @throws Exception 273 */ 274 @Test testChooseTarget1()275 public void testChooseTarget1() throws Exception { 276 updateHeartbeatWithUsage(dataNodes[0], 277 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 278 HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 279 0L, 0L, 4, 0); // overloaded 280 281 DatanodeStorageInfo[] targets; 282 targets = chooseTarget(0); 283 assertEquals(targets.length, 0); 284 285 targets = chooseTarget(1); 286 assertEquals(targets.length, 1); 287 assertEquals(storages[0], targets[0]); 288 289 290 targets = chooseTarget(2); 291 assertEquals(targets.length, 2); 292 assertEquals(storages[0], targets[0]); 293 294 assertFalse(isOnSameRack(targets[0], targets[1])); 295 296 targets = chooseTarget(3); 297 assertEquals(targets.length, 3); 298 assertEquals(storages[0], targets[0]); 299 300 assertFalse(isOnSameRack(targets[0], targets[1])); 301 assertTrue(isOnSameRack(targets[1], targets[2])); 302 assertFalse(isOnSameNodeGroup(targets[1], targets[2])); 303 304 targets = chooseTarget(4); 305 assertEquals(targets.length, 4); 306 assertEquals(storages[0], targets[0]); 307 308 assertTrue(isOnSameRack(targets[1], targets[2]) || 309 isOnSameRack(targets[2], targets[3])); 310 assertFalse(isOnSameRack(targets[0], targets[2])); 311 // Make sure no more than one replicas are on the same nodegroup 312 verifyNoTwoTargetsOnSameNodeGroup(targets); 313 314 updateHeartbeatWithUsage(dataNodes[0], 315 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 316 HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); 317 } 318 verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets)319 private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets) { 320 Set<String> nodeGroupSet = new HashSet<String>(); 321 for (DatanodeStorageInfo target: targets) { 322 nodeGroupSet.add(target.getDatanodeDescriptor().getNetworkLocation()); 323 } 324 assertEquals(nodeGroupSet.size(), targets.length); 325 } 326 327 /** 328 * In this testcase, client is dataNodes[0], but the dataNodes[1] is 329 * not allowed to be chosen. So the 1st replica should be 330 * placed on dataNodes[0], the 2nd replica should be placed on a different 331 * rack, the 3rd should be on same rack as the 2nd replica but in different 332 * node group, and the rest should be placed on a third rack. 333 * @throws Exception 334 */ 335 @Test testChooseTarget2()336 public void testChooseTarget2() throws Exception { 337 DatanodeStorageInfo[] targets; 338 BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator; 339 List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); 340 341 Set<Node> excludedNodes = new HashSet<Node>(); 342 excludedNodes.add(dataNodes[1]); 343 targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false, 344 excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); 345 assertEquals(targets.length, 4); 346 assertEquals(storages[0], targets[0]); 347 348 assertTrue(cluster.isNodeGroupAware()); 349 // Make sure no replicas are on the same nodegroup 350 for (int i=1;i<4;i++) { 351 assertFalse(isOnSameNodeGroup(targets[0], targets[i])); 352 } 353 assertTrue(isOnSameRack(targets[1], targets[2]) || 354 isOnSameRack(targets[2], targets[3])); 355 assertFalse(isOnSameRack(targets[1], targets[3])); 356 357 excludedNodes.clear(); 358 chosenNodes.clear(); 359 excludedNodes.add(dataNodes[1]); 360 chosenNodes.add(storages[2]); 361 targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, 362 excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); 363 System.out.println("targets=" + Arrays.asList(targets)); 364 assertEquals(2, targets.length); 365 //make sure that the chosen node is in the target. 366 int i = 0; 367 for(; i < targets.length && !storages[2].equals(targets[i]); i++); 368 assertTrue(i < targets.length); 369 } 370 371 /** 372 * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified 373 * to be chosen. So the 1st replica should be placed on dataNodes[1], 374 * the 2nd replica should be placed on a different rack, 375 * the 3rd replica should be placed on the same rack as the 2nd replica but in different nodegroup, 376 * and the rest should be placed on the third rack. 377 * @throws Exception 378 */ 379 @Test 380 public void testChooseTarget3() throws Exception { 381 // make data node 0 to be not qualified to choose 382 updateHeartbeatWithUsage(dataNodes[0], 383 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 384 (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 385 0L, 0L, 0, 0); // no space 386 387 DatanodeStorageInfo[] targets; 388 targets = chooseTarget(0); 389 assertEquals(targets.length, 0); 390 391 targets = chooseTarget(1); 392 assertEquals(targets.length, 1); 393 assertEquals(storages[1], targets[0]); 394 395 targets = chooseTarget(2); 396 assertEquals(targets.length, 2); 397 assertEquals(storages[1], targets[0]); 398 assertFalse(isOnSameRack(targets[0], targets[1])); 399 400 targets = chooseTarget(3); 401 assertEquals(targets.length, 3); 402 assertEquals(storages[1], targets[0]); 403 assertTrue(isOnSameRack(targets[1], targets[2])); 404 assertFalse(isOnSameRack(targets[0], targets[1])); 405 406 targets = chooseTarget(4); 407 assertEquals(targets.length, 4); 408 assertEquals(storages[1], targets[0]); 409 assertTrue(cluster.isNodeGroupAware()); 410 verifyNoTwoTargetsOnSameNodeGroup(targets); 411 assertTrue(isOnSameRack(targets[1], targets[2]) || 412 isOnSameRack(targets[2], targets[3])); 413 414 updateHeartbeatWithUsage(dataNodes[0], 415 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 416 HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); 417 } 418 419 /** 420 * In this testcase, client is dataNodes[0], but none of the nodes on rack 1 421 * is qualified to be chosen. So the 1st replica should be placed on either 422 * rack 2 or rack 3. 423 * the 2nd replica should be placed on a different rack, 424 * the 3rd replica should be placed on the same rack as the 1st replica, but 425 * in different node group. 426 * @throws Exception 427 */ 428 @Test 429 public void testChooseTarget4() throws Exception { 430 // make data node 0-2 to be not qualified to choose: not enough disk space 431 for(int i=0; i<3; i++) { 432 updateHeartbeatWithUsage(dataNodes[i], 433 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 434 (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); 435 } 436 437 DatanodeStorageInfo[] targets; 438 targets = chooseTarget(0); 439 assertEquals(targets.length, 0); 440 441 targets = chooseTarget(1); 442 assertEquals(targets.length, 1); 443 assertFalse(isOnSameRack(dataNodes[0], targets[0])); 444 445 targets = chooseTarget(2); 446 assertEquals(targets.length, 2); 447 assertFalse(isOnSameRack(dataNodes[0], targets[0])); 448 assertFalse(isOnSameRack(targets[0], targets[1])); 449 450 targets = chooseTarget(3); 451 assertEquals(targets.length, 3); 452 for(int i=0; i<3; i++) { 453 assertFalse(isOnSameRack(dataNodes[0], targets[i])); 454 } 455 verifyNoTwoTargetsOnSameNodeGroup(targets); 456 assertTrue(isOnSameRack(targets[0], targets[1]) || 457 isOnSameRack(targets[1], targets[2])); 458 assertFalse(isOnSameRack(targets[0], targets[2])); 459 } 460 461 /** 462 * In this testcase, client is is a node outside of file system. 463 * So the 1st replica can be placed on any node. 464 * the 2nd replica should be placed on a different rack, 465 * the 3rd replica should be placed on the same rack as the 2nd replica, 466 * @throws Exception 467 */ 468 @Test 469 public void testChooseTarget5() throws Exception { 470 setupDataNodeCapacity(); 471 DatanodeStorageInfo[] targets; 472 targets = chooseTarget(0, NODE); 473 assertEquals(targets.length, 0); 474 475 targets = chooseTarget(1, NODE); 476 assertEquals(targets.length, 1); 477 478 targets = chooseTarget(2, NODE); 479 assertEquals(targets.length, 2); 480 assertFalse(isOnSameRack(targets[0], targets[1])); 481 482 targets = chooseTarget(3, NODE); 483 assertEquals(targets.length, 3); 484 assertTrue(isOnSameRack(targets[1], targets[2])); 485 assertFalse(isOnSameRack(targets[0], targets[1])); 486 verifyNoTwoTargetsOnSameNodeGroup(targets); 487 } 488 489 /** 490 * This testcase tests re-replication, when dataNodes[0] is already chosen. 491 * So the 1st replica can be placed on random rack. 492 * the 2nd replica should be placed on different node and nodegroup by same rack as 493 * the 1st replica. The 3rd replica can be placed randomly. 494 * @throws Exception 495 */ 496 @Test 497 public void testRereplicate1() throws Exception { 498 setupDataNodeCapacity(); 499 List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); 500 chosenNodes.add(storages[0]); 501 DatanodeStorageInfo[] targets; 502 503 targets = chooseTarget(0, chosenNodes); 504 assertEquals(targets.length, 0); 505 506 targets = chooseTarget(1, chosenNodes); 507 assertEquals(targets.length, 1); 508 assertFalse(isOnSameRack(dataNodes[0], targets[0])); 509 510 targets = chooseTarget(2, chosenNodes); 511 assertEquals(targets.length, 2); 512 assertTrue(isOnSameRack(dataNodes[0], targets[0])); 513 assertFalse(isOnSameRack(targets[0], targets[1])); 514 515 targets = chooseTarget(3, chosenNodes); 516 assertEquals(targets.length, 3); 517 assertTrue(isOnSameRack(dataNodes[0], targets[0])); 518 assertFalse(isOnSameNodeGroup(dataNodes[0], targets[0])); 519 assertFalse(isOnSameRack(targets[0], targets[2])); 520 } 521 522 /** 523 * This testcase tests re-replication, 524 * when dataNodes[0] and dataNodes[1] are already chosen. 525 * So the 1st replica should be placed on a different rack of rack 1. 526 * the rest replicas can be placed randomly, 527 * @throws Exception 528 */ 529 @Test 530 public void testRereplicate2() throws Exception { 531 setupDataNodeCapacity(); 532 List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); 533 chosenNodes.add(storages[0]); 534 chosenNodes.add(storages[1]); 535 536 DatanodeStorageInfo[] targets; 537 targets = chooseTarget(0, chosenNodes); 538 assertEquals(targets.length, 0); 539 540 targets = chooseTarget(1, chosenNodes); 541 assertEquals(targets.length, 1); 542 assertFalse(isOnSameRack(dataNodes[0], targets[0])); 543 544 targets = chooseTarget(2, chosenNodes); 545 assertEquals(targets.length, 2); 546 assertFalse(isOnSameRack(dataNodes[0], targets[0]) && 547 isOnSameRack(dataNodes[0], targets[1])); 548 } 549 550 /** 551 * This testcase tests re-replication, 552 * when dataNodes[0] and dataNodes[3] are already chosen. 553 * So the 1st replica should be placed on the rack that the writer resides. 554 * the rest replicas can be placed randomly, 555 * @throws Exception 556 */ 557 @Test 558 public void testRereplicate3() throws Exception { 559 setupDataNodeCapacity(); 560 List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); 561 chosenNodes.add(storages[0]); 562 chosenNodes.add(storages[3]); 563 564 DatanodeStorageInfo[] targets; 565 targets = chooseTarget(0, chosenNodes); 566 assertEquals(targets.length, 0); 567 568 targets = chooseTarget(1, chosenNodes); 569 assertEquals(targets.length, 1); 570 assertTrue(isOnSameRack(dataNodes[0], targets[0])); 571 assertFalse(isOnSameRack(dataNodes[3], targets[0])); 572 573 targets = chooseTarget(1, dataNodes[3], chosenNodes); 574 assertEquals(targets.length, 1); 575 assertTrue(isOnSameRack(dataNodes[3], targets[0])); 576 assertFalse(isOnSameNodeGroup(dataNodes[3], targets[0])); 577 assertFalse(isOnSameRack(dataNodes[0], targets[0])); 578 579 targets = chooseTarget(2, chosenNodes); 580 assertEquals(targets.length, 2); 581 assertTrue(isOnSameRack(dataNodes[0], targets[0])); 582 assertFalse(isOnSameNodeGroup(dataNodes[0], targets[0])); 583 584 targets = chooseTarget(2, dataNodes[3], chosenNodes); 585 assertEquals(targets.length, 2); 586 assertTrue(isOnSameRack(dataNodes[3], targets[0])); 587 } 588 589 /** 590 * Test for the chooseReplicaToDelete are processed based on 591 * block locality and free space 592 */ 593 @Test 594 public void testChooseReplicaToDelete() throws Exception { 595 List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>(); 596 final Map<String, List<DatanodeStorageInfo>> rackMap 597 = new HashMap<String, List<DatanodeStorageInfo>>(); 598 dataNodes[0].setRemaining(4*1024*1024); 599 replicaList.add(storages[0]); 600 601 dataNodes[1].setRemaining(3*1024*1024); 602 replicaList.add(storages[1]); 603 604 dataNodes[2].setRemaining(2*1024*1024); 605 replicaList.add(storages[2]); 606 607 dataNodes[5].setRemaining(1*1024*1024); 608 replicaList.add(storages[5]); 609 610 List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>(); 611 List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>(); 612 replicator.splitNodesWithRack( 613 replicaList, rackMap, first, second); 614 assertEquals(3, first.size()); 615 assertEquals(1, second.size()); 616 List<StorageType> excessTypes = new ArrayList<StorageType>(); 617 excessTypes.add(StorageType.DEFAULT); 618 DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete( 619 null, null, (short)3, first, second, excessTypes); 620 // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, 621 // dataNodes[0] and dataNodes[1] are in the same nodegroup, 622 // but dataNodes[1] is chosen as less free space 623 assertEquals(chosen, storages[1]); 624 625 replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); 626 assertEquals(2, first.size()); 627 assertEquals(1, second.size()); 628 // Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen 629 // as less free space 630 excessTypes.add(StorageType.DEFAULT); 631 chosen = replicator.chooseReplicaToDelete( 632 null, null, (short)2, first, second, excessTypes); 633 assertEquals(chosen, storages[2]); 634 635 replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); 636 assertEquals(0, first.size()); 637 assertEquals(2, second.size()); 638 // Within second set, dataNodes[5] with less free space 639 excessTypes.add(StorageType.DEFAULT); 640 chosen = replicator.chooseReplicaToDelete( 641 null, null, (short)1, first, second, excessTypes); 642 assertEquals(chosen, storages[5]); 643 } 644 645 /** 646 * Test replica placement policy in case of boundary topology. 647 * Rack 2 has only 1 node group & can't be placed with two replicas 648 * The 1st replica will be placed on writer. 649 * The 2nd replica should be placed on a different rack 650 * The 3rd replica should be placed on the same rack with writer, but on a 651 * different node group. 652 */ 653 @Test 654 public void testChooseTargetsOnBoundaryTopology() throws Exception { 655 for(int i=0; i<NUM_OF_DATANODES; i++) { 656 cluster.remove(dataNodes[i]); 657 } 658 659 for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { 660 cluster.add(dataNodesInBoundaryCase[i]); 661 } 662 for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { 663 updateHeartbeatWithUsage(dataNodes[0], 664 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 665 (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 666 0L, 0L, 0L, 0, 0); 667 668 updateHeartbeatWithUsage(dataNodesInBoundaryCase[i], 669 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 670 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); 671 } 672 673 DatanodeStorageInfo[] targets; 674 targets = chooseTarget(0, dataNodesInBoundaryCase[0]); 675 assertEquals(targets.length, 0); 676 677 targets = chooseTarget(1, dataNodesInBoundaryCase[0]); 678 assertEquals(targets.length, 1); 679 680 targets = chooseTarget(2, dataNodesInBoundaryCase[0]); 681 assertEquals(targets.length, 2); 682 assertFalse(isOnSameRack(targets[0], targets[1])); 683 684 targets = chooseTarget(3, dataNodesInBoundaryCase[0]); 685 assertEquals(targets.length, 3); 686 assertTrue(checkTargetsOnDifferentNodeGroup(targets)); 687 } 688 689 /** 690 * Test re-replication policy in boundary case. 691 * Rack 2 has only one node group & the node in this node group is chosen 692 * Rack 1 has two nodegroups & one of them is chosen. 693 * Replica policy should choose the node from node group of Rack1 but not the 694 * same nodegroup with chosen nodes. 695 */ 696 @Test 697 public void testRereplicateOnBoundaryTopology() throws Exception { 698 for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { 699 updateHeartbeatWithUsage(dataNodesInBoundaryCase[i], 700 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 701 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); 702 } 703 List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); 704 chosenNodes.add(storagesInBoundaryCase[0]); 705 chosenNodes.add(storagesInBoundaryCase[5]); 706 DatanodeStorageInfo[] targets; 707 targets = chooseTarget(1, dataNodesInBoundaryCase[0], chosenNodes); 708 assertFalse(isOnSameNodeGroup(dataNodesInBoundaryCase[0], targets[0])); 709 assertFalse(isOnSameNodeGroup(dataNodesInBoundaryCase[5], targets[0])); 710 assertTrue(checkTargetsOnDifferentNodeGroup(targets)); 711 } 712 713 /** 714 * Test replica placement policy in case of targets more than number of 715 * NodeGroups. 716 * The 12-nodes cluster only has 6 NodeGroups, but in some cases, like: 717 * placing submitted job file, there is requirement to choose more (10) 718 * targets for placing replica. We should test it can return 6 targets. 719 */ 720 @Test 721 public void testChooseMoreTargetsThanNodeGroups() throws Exception { 722 for(int i=0; i<NUM_OF_DATANODES; i++) { 723 cluster.remove(dataNodes[i]); 724 } 725 for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { 726 DatanodeDescriptor node = dataNodesInBoundaryCase[i]; 727 if (cluster.contains(node)) { 728 cluster.remove(node); 729 } 730 } 731 732 for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) { 733 cluster.add(dataNodesInMoreTargetsCase[i]); 734 } 735 736 for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) { 737 updateHeartbeatWithUsage(dataNodesInMoreTargetsCase[i], 738 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 739 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); 740 } 741 742 DatanodeStorageInfo[] targets; 743 // Test normal case -- 3 replicas 744 targets = chooseTarget(3, dataNodesInMoreTargetsCase[0]); 745 assertEquals(targets.length, 3); 746 assertTrue(checkTargetsOnDifferentNodeGroup(targets)); 747 748 // Test special case -- replica number over node groups. 749 targets = chooseTarget(10, dataNodesInMoreTargetsCase[0]); 750 assertTrue(checkTargetsOnDifferentNodeGroup(targets)); 751 // Verify it only can find 6 targets for placing replicas. 752 assertEquals(targets.length, 6); 753 } 754 755 @Test 756 public void testChooseTargetWithDependencies() throws Exception { 757 for(int i=0; i<NUM_OF_DATANODES; i++) { 758 cluster.remove(dataNodes[i]); 759 } 760 761 for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) { 762 DatanodeDescriptor node = dataNodesInMoreTargetsCase[i]; 763 if (cluster.contains(node)) { 764 cluster.remove(node); 765 } 766 } 767 768 Host2NodesMap host2DatanodeMap = namenode.getNamesystem() 769 .getBlockManager() 770 .getDatanodeManager().getHost2DatanodeMap(); 771 for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) { 772 cluster.add(dataNodesForDependencies[i]); 773 host2DatanodeMap.add(dataNodesForDependencies[i]); 774 } 775 776 //add dependencies (node1 <-> node2, and node3<->node4) 777 dataNodesForDependencies[1].addDependentHostName( 778 dataNodesForDependencies[2].getHostName()); 779 dataNodesForDependencies[2].addDependentHostName( 780 dataNodesForDependencies[1].getHostName()); 781 dataNodesForDependencies[3].addDependentHostName( 782 dataNodesForDependencies[4].getHostName()); 783 dataNodesForDependencies[4].addDependentHostName( 784 dataNodesForDependencies[3].getHostName()); 785 786 //Update heartbeat 787 for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) { 788 updateHeartbeatWithUsage(dataNodesForDependencies[i], 789 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 790 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); 791 } 792 793 List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); 794 795 DatanodeStorageInfo[] targets; 796 Set<Node> excludedNodes = new HashSet<Node>(); 797 excludedNodes.add(dataNodesForDependencies[5]); 798 799 //try to select three targets as there are three node groups 800 targets = chooseTarget(3, dataNodesForDependencies[1], chosenNodes, excludedNodes); 801 802 //Even there are three node groups, verify that 803 //only two targets are selected due to dependencies 804 assertEquals(targets.length, 2); 805 assertEquals(targets[0], storagesForDependencies[1]); 806 assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4])); 807 808 //verify that all data nodes are in the excluded list 809 assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES); 810 for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) { 811 assertTrue(excludedNodes.contains(dataNodesForDependencies[i])); 812 } 813 } 814 } 815