1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertNotNull; 22 import static org.junit.Assert.assertNull; 23 import static org.junit.Assert.assertTrue; 24 25 import java.io.IOException; 26 import java.util.ArrayList; 27 import java.util.Arrays; 28 import java.util.Collection; 29 import java.util.Iterator; 30 import java.util.List; 31 import java.util.Random; 32 import java.util.concurrent.ExecutionException; 33 34 import com.google.common.base.Supplier; 35 import com.google.common.collect.Lists; 36 import org.apache.hadoop.conf.Configuration; 37 import org.apache.hadoop.fs.BlockLocation; 38 import org.apache.hadoop.fs.CommonConfigurationKeys; 39 import org.apache.hadoop.fs.FSDataOutputStream; 40 import org.apache.hadoop.fs.FileSystem; 41 import org.apache.hadoop.fs.Path; 42 import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 43 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 44 import org.apache.hadoop.hdfs.protocol.DatanodeID; 45 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 46 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; 47 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 48 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; 49 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 50 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 51 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; 52 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; 53 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; 54 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; 55 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; 56 import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager; 57 import org.apache.hadoop.hdfs.server.datanode.DataNode; 58 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; 59 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; 60 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; 61 import org.apache.hadoop.hdfs.server.namenode.NameNode; 62 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; 63 import org.apache.hadoop.test.GenericTestUtils; 64 import org.apache.hadoop.test.PathUtils; 65 import org.apache.log4j.Level; 66 import org.junit.After; 67 import org.junit.Assert; 68 import org.junit.Before; 69 import org.junit.Ignore; 70 import org.junit.Test; 71 import org.slf4j.Logger; 72 import org.slf4j.LoggerFactory; 73 74 /** 75 * This class tests the decommissioning of nodes. 76 */ 77 public class TestDecommission { 78 public static final Logger LOG = LoggerFactory.getLogger(TestDecommission 79 .class); 80 static final long seed = 0xDEADBEEFL; 81 static final int blockSize = 8192; 82 static final int fileSize = 16384; 83 static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds 84 static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec 85 static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval 86 87 final Random myrand = new Random(); 88 Path dir; 89 Path hostsFile; 90 Path excludeFile; 91 FileSystem localFileSys; 92 Configuration conf; 93 MiniDFSCluster cluster = null; 94 95 @Before setup()96 public void setup() throws IOException { 97 conf = new HdfsConfiguration(); 98 // Set up the hosts/exclude files. 99 localFileSys = FileSystem.getLocal(conf); 100 Path workingDir = localFileSys.getWorkingDirectory(); 101 dir = new Path(workingDir, PathUtils.getTestDirName(getClass()) + "/work-dir/decommission"); 102 hostsFile = new Path(dir, "hosts"); 103 excludeFile = new Path(dir, "exclude"); 104 105 // Setup conf 106 conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); 107 conf.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath()); 108 conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); 109 conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); 110 conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); 111 conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); 112 conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC); 113 conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4); 114 conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL); 115 116 writeConfigFile(hostsFile, null); 117 writeConfigFile(excludeFile, null); 118 } 119 120 @After teardown()121 public void teardown() throws IOException { 122 cleanupFile(localFileSys, dir); 123 if (cluster != null) { 124 cluster.shutdown(); 125 } 126 } 127 writeConfigFile(Path name, List<String> nodes)128 private void writeConfigFile(Path name, List<String> nodes) 129 throws IOException { 130 // delete if it already exists 131 if (localFileSys.exists(name)) { 132 localFileSys.delete(name, true); 133 } 134 135 FSDataOutputStream stm = localFileSys.create(name); 136 137 if (nodes != null) { 138 for (Iterator<String> it = nodes.iterator(); it.hasNext();) { 139 String node = it.next(); 140 stm.writeBytes(node); 141 stm.writeBytes("\n"); 142 } 143 } 144 stm.close(); 145 } 146 writeFile(FileSystem fileSys, Path name, int repl)147 private void writeFile(FileSystem fileSys, Path name, int repl) 148 throws IOException { 149 // create and write a file that contains three blocks of data 150 FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf() 151 .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), 152 (short) repl, blockSize); 153 byte[] buffer = new byte[fileSize]; 154 Random rand = new Random(seed); 155 rand.nextBytes(buffer); 156 stm.write(buffer); 157 stm.close(); 158 LOG.info("Created file " + name + " with " + repl + " replicas."); 159 } 160 161 /** 162 * Verify that the number of replicas are as expected for each block in 163 * the given file. 164 * For blocks with a decommissioned node, verify that their replication 165 * is 1 more than what is specified. 166 * For blocks without decommissioned nodes, verify their replication is 167 * equal to what is specified. 168 * 169 * @param downnode - if null, there is no decommissioned node for this file. 170 * @return - null if no failure found, else an error message string. 171 */ checkFile(FileSystem fileSys, Path name, int repl, String downnode, int numDatanodes)172 private static String checkFile(FileSystem fileSys, Path name, int repl, 173 String downnode, int numDatanodes) throws IOException { 174 boolean isNodeDown = (downnode != null); 175 // need a raw stream 176 assertTrue("Not HDFS:"+fileSys.getUri(), 177 fileSys instanceof DistributedFileSystem); 178 HdfsDataInputStream dis = (HdfsDataInputStream) 179 fileSys.open(name); 180 Collection<LocatedBlock> dinfo = dis.getAllBlocks(); 181 for (LocatedBlock blk : dinfo) { // for each block 182 int hasdown = 0; 183 DatanodeInfo[] nodes = blk.getLocations(); 184 for (int j = 0; j < nodes.length; j++) { // for each replica 185 if (isNodeDown && nodes[j].getXferAddr().equals(downnode)) { 186 hasdown++; 187 //Downnode must actually be decommissioned 188 if (!nodes[j].isDecommissioned()) { 189 return "For block " + blk.getBlock() + " replica on " + 190 nodes[j] + " is given as downnode, " + 191 "but is not decommissioned"; 192 } 193 //Decommissioned node (if any) should only be last node in list. 194 if (j != nodes.length - 1) { 195 return "For block " + blk.getBlock() + " decommissioned node " 196 + nodes[j] + " was not last node in list: " 197 + (j + 1) + " of " + nodes.length; 198 } 199 LOG.info("Block " + blk.getBlock() + " replica on " + 200 nodes[j] + " is decommissioned."); 201 } else { 202 //Non-downnodes must not be decommissioned 203 if (nodes[j].isDecommissioned()) { 204 return "For block " + blk.getBlock() + " replica on " + 205 nodes[j] + " is unexpectedly decommissioned"; 206 } 207 } 208 } 209 210 LOG.info("Block " + blk.getBlock() + " has " + hasdown 211 + " decommissioned replica."); 212 if(Math.min(numDatanodes, repl+hasdown) != nodes.length) { 213 return "Wrong number of replicas for block " + blk.getBlock() + 214 ": " + nodes.length + ", expected " + 215 Math.min(numDatanodes, repl+hasdown); 216 } 217 } 218 return null; 219 } 220 cleanupFile(FileSystem fileSys, Path name)221 private void cleanupFile(FileSystem fileSys, Path name) throws IOException { 222 assertTrue(fileSys.exists(name)); 223 fileSys.delete(name, true); 224 assertTrue(!fileSys.exists(name)); 225 } 226 227 /* 228 * decommission the DN at index dnIndex or one random node if dnIndex is set 229 * to -1 and wait for the node to reach the given {@code waitForState}. 230 */ decommissionNode(int nnIndex, String datanodeUuid, ArrayList<DatanodeInfo>decommissionedNodes, AdminStates waitForState)231 private DatanodeInfo decommissionNode(int nnIndex, 232 String datanodeUuid, 233 ArrayList<DatanodeInfo>decommissionedNodes, 234 AdminStates waitForState) 235 throws IOException { 236 DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf); 237 DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); 238 239 // 240 // pick one datanode randomly unless the caller specifies one. 241 // 242 int index = 0; 243 if (datanodeUuid == null) { 244 boolean found = false; 245 while (!found) { 246 index = myrand.nextInt(info.length); 247 if (!info[index].isDecommissioned()) { 248 found = true; 249 } 250 } 251 } else { 252 // The caller specifies a DN 253 for (; index < info.length; index++) { 254 if (info[index].getDatanodeUuid().equals(datanodeUuid)) { 255 break; 256 } 257 } 258 if (index == info.length) { 259 throw new IOException("invalid datanodeUuid " + datanodeUuid); 260 } 261 } 262 String nodename = info[index].getXferAddr(); 263 LOG.info("Decommissioning node: " + nodename); 264 265 // write nodename into the exclude file. 266 ArrayList<String> nodes = new ArrayList<String>(); 267 if (decommissionedNodes != null) { 268 for (DatanodeInfo dn : decommissionedNodes) { 269 nodes.add(dn.getName()); 270 } 271 } 272 nodes.add(nodename); 273 writeConfigFile(excludeFile, nodes); 274 refreshNodes(cluster.getNamesystem(nnIndex), conf); 275 DatanodeInfo ret = NameNodeAdapter.getDatanode( 276 cluster.getNamesystem(nnIndex), info[index]); 277 waitNodeState(ret, waitForState); 278 return ret; 279 } 280 281 /* Ask a specific NN to stop decommission of the datanode and wait for each 282 * to reach the NORMAL state. 283 */ recommissionNode(int nnIndex, DatanodeInfo decommissionedNode)284 private void recommissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException { 285 LOG.info("Recommissioning node: " + decommissionedNode); 286 writeConfigFile(excludeFile, null); 287 refreshNodes(cluster.getNamesystem(nnIndex), conf); 288 waitNodeState(decommissionedNode, AdminStates.NORMAL); 289 290 } 291 292 /* 293 * Wait till node is fully decommissioned. 294 */ waitNodeState(DatanodeInfo node, AdminStates state)295 private void waitNodeState(DatanodeInfo node, 296 AdminStates state) { 297 boolean done = state == node.getAdminState(); 298 while (!done) { 299 LOG.info("Waiting for node " + node + " to change state to " 300 + state + " current state: " + node.getAdminState()); 301 try { 302 Thread.sleep(HEARTBEAT_INTERVAL * 500); 303 } catch (InterruptedException e) { 304 // nothing 305 } 306 done = state == node.getAdminState(); 307 } 308 LOG.info("node " + node + " reached the state " + state); 309 } 310 311 /* Get DFSClient to the namenode */ getDfsClient(NameNode nn, Configuration conf)312 private static DFSClient getDfsClient(NameNode nn, 313 Configuration conf) throws IOException { 314 return new DFSClient(nn.getNameNodeAddress(), conf); 315 } 316 317 /* Validate cluster has expected number of datanodes */ validateCluster(DFSClient client, int numDNs)318 private static void validateCluster(DFSClient client, int numDNs) 319 throws IOException { 320 DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); 321 assertEquals("Number of Datanodes ", numDNs, info.length); 322 } 323 324 /** Start a MiniDFSCluster 325 * @throws IOException */ startCluster(int numNameNodes, int numDatanodes, Configuration conf)326 private void startCluster(int numNameNodes, int numDatanodes, 327 Configuration conf) throws IOException { 328 cluster = new MiniDFSCluster.Builder(conf) 329 .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) 330 .numDataNodes(numDatanodes).build(); 331 cluster.waitActive(); 332 for (int i = 0; i < numNameNodes; i++) { 333 DFSClient client = getDfsClient(cluster.getNameNode(i), conf); 334 validateCluster(client, numDatanodes); 335 } 336 } 337 refreshNodes(final FSNamesystem ns, final Configuration conf )338 static void refreshNodes(final FSNamesystem ns, final Configuration conf 339 ) throws IOException { 340 ns.getBlockManager().getDatanodeManager().refreshNodes(conf); 341 } 342 verifyStats(NameNode namenode, FSNamesystem fsn, DatanodeInfo info, DataNode node, boolean decommissioning)343 private void verifyStats(NameNode namenode, FSNamesystem fsn, 344 DatanodeInfo info, DataNode node, boolean decommissioning) 345 throws InterruptedException, IOException { 346 // Do the stats check over 10 heartbeats 347 for (int i = 0; i < 10; i++) { 348 long[] newStats = namenode.getRpcServer().getStats(); 349 350 // For decommissioning nodes, ensure capacity of the DN is no longer 351 // counted. Only used space of the DN is counted in cluster capacity 352 assertEquals(newStats[0], 353 decommissioning ? info.getDfsUsed() : info.getCapacity()); 354 355 // Ensure cluster used capacity is counted for both normal and 356 // decommissioning nodes 357 assertEquals(newStats[1], info.getDfsUsed()); 358 359 // For decommissioning nodes, remaining space from the DN is not counted 360 assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining()); 361 362 // Ensure transceiver count is same as that DN 363 assertEquals(fsn.getTotalLoad(), info.getXceiverCount()); 364 DataNodeTestUtils.triggerHeartbeat(node); 365 } 366 } 367 368 /** 369 * Tests decommission for non federated cluster 370 */ 371 @Test(timeout=360000) testDecommission()372 public void testDecommission() throws IOException { 373 testDecommission(1, 6); 374 } 375 376 /** 377 * Tests decommission with replicas on the target datanode cannot be migrated 378 * to other datanodes and satisfy the replication factor. Make sure the 379 * datanode won't get stuck in decommissioning state. 380 */ 381 @Test(timeout = 360000) testDecommission2()382 public void testDecommission2() throws IOException { 383 LOG.info("Starting test testDecommission"); 384 int numNamenodes = 1; 385 int numDatanodes = 4; 386 conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3); 387 startCluster(numNamenodes, numDatanodes, conf); 388 389 ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>( 390 numNamenodes); 391 namenodeDecomList.add(0, new ArrayList<DatanodeInfo>(numDatanodes)); 392 393 Path file1 = new Path("testDecommission2.dat"); 394 int replicas = 4; 395 396 // Start decommissioning one namenode at a time 397 ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(0); 398 FileSystem fileSys = cluster.getFileSystem(0); 399 FSNamesystem ns = cluster.getNamesystem(0); 400 401 writeFile(fileSys, file1, replicas); 402 403 int deadDecomissioned = ns.getNumDecomDeadDataNodes(); 404 int liveDecomissioned = ns.getNumDecomLiveDataNodes(); 405 406 // Decommission one node. Verify that node is decommissioned. 407 DatanodeInfo decomNode = decommissionNode(0, null, decommissionedNodes, 408 AdminStates.DECOMMISSIONED); 409 decommissionedNodes.add(decomNode); 410 assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); 411 assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes()); 412 413 // Ensure decommissioned datanode is not automatically shutdown 414 DFSClient client = getDfsClient(cluster.getNameNode(0), conf); 415 assertEquals("All datanodes must be alive", numDatanodes, 416 client.datanodeReport(DatanodeReportType.LIVE).length); 417 assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), 418 numDatanodes)); 419 cleanupFile(fileSys, file1); 420 421 // Restart the cluster and ensure recommissioned datanodes 422 // are allowed to register with the namenode 423 cluster.shutdown(); 424 startCluster(1, 4, conf); 425 cluster.shutdown(); 426 } 427 428 /** 429 * Test decommission for federeated cluster 430 */ 431 @Test(timeout=360000) testDecommissionFederation()432 public void testDecommissionFederation() throws IOException { 433 testDecommission(2, 2); 434 } 435 436 /** 437 * Test decommission process on standby NN. 438 * Verify admins can run "dfsadmin -refreshNodes" on SBN and decomm 439 * process can finish as long as admins run "dfsadmin -refreshNodes" 440 * on active NN. 441 * SBN used to mark excess replica upon recommission. The SBN's pick 442 * for excess replica could be different from the one picked by ANN. 443 * That creates inconsistent state and prevent SBN from finishing 444 * decommission. 445 */ 446 @Test(timeout=360000) testDecommissionOnStandby()447 public void testDecommissionOnStandby() throws Exception { 448 Configuration hdfsConf = new HdfsConfiguration(conf); 449 hdfsConf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); 450 hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 30000); 451 hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 2); 452 453 // The time to wait so that the slow DN's heartbeat is considered old 454 // by BlockPlacementPolicyDefault and thus will choose that DN for 455 // excess replica. 456 long slowHeartbeatDNwaitTime = 457 hdfsConf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 458 DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000 * (hdfsConf.getInt( 459 DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 460 DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT) + 1); 461 462 cluster = new MiniDFSCluster.Builder(hdfsConf) 463 .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build(); 464 465 cluster.transitionToActive(0); 466 cluster.waitActive(); 467 468 469 // Step 1, create a cluster with 4 DNs. Blocks are stored on the first 3 DNs. 470 // The last DN is empty. Also configure the last DN to have slow heartbeat 471 // so that it will be chosen as excess replica candidate during recommission. 472 473 // Step 1.a, copy blocks to the first 3 DNs. Given the replica count is the 474 // same as # of DNs, each DN will have a replica for any block. 475 Path file1 = new Path("testDecommissionHA.dat"); 476 int replicas = 3; 477 FileSystem activeFileSys = cluster.getFileSystem(0); 478 writeFile(activeFileSys, file1, replicas); 479 480 HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), 481 cluster.getNameNode(1)); 482 483 // Step 1.b, start a DN with slow heartbeat, so that we can know for sure it 484 // will be chosen as the target of excess replica during recommission. 485 hdfsConf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); 486 cluster.startDataNodes(hdfsConf, 1, true, null, null, null); 487 DataNode lastDN = cluster.getDataNodes().get(3); 488 lastDN.getDatanodeUuid(); 489 490 // Step 2, decommission the first DN at both ANN and SBN. 491 DataNode firstDN = cluster.getDataNodes().get(0); 492 493 // Step 2.a, ask ANN to decomm the first DN 494 DatanodeInfo decommissionedNodeFromANN = decommissionNode( 495 0, firstDN.getDatanodeUuid(), null, AdminStates.DECOMMISSIONED); 496 497 // Step 2.b, ask SBN to decomm the first DN 498 DatanodeInfo decomNodeFromSBN = decommissionNode(1, firstDN.getDatanodeUuid(), null, 499 AdminStates.DECOMMISSIONED); 500 501 // Step 3, recommission the first DN on SBN and ANN to create excess replica 502 // It recommissions the node on SBN first to create potential 503 // inconsistent state. In production cluster, such insistent state can happen 504 // even if recommission command was issued on ANN first given the async nature 505 // of the system. 506 507 // Step 3.a, ask SBN to recomm the first DN. 508 // SBN has been fixed so that it no longer invalidates excess replica during 509 // recommission. 510 // Before the fix, SBN could get into the following state. 511 // 1. the last DN would have been chosen as excess replica, given its 512 // heartbeat is considered old. 513 // Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete 514 // 2. After recommissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 ) 515 // and one excess replica ( 3 ) 516 // After the fix, 517 // After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 ) 518 Thread.sleep(slowHeartbeatDNwaitTime); 519 recommissionNode(1, decomNodeFromSBN); 520 521 // Step 3.b, ask ANN to recommission the first DN. 522 // To verify the fix, the test makes sure the excess replica picked by ANN 523 // is different from the one picked by SBN before the fix. 524 // To achieve that, we make sure next-to-last DN is chosen as excess replica 525 // by ANN. 526 // 1. restore LastDNprop's heartbeat interval. 527 // 2. Make next-to-last DN's heartbeat slow. 528 MiniDFSCluster.DataNodeProperties LastDNprop = cluster.stopDataNode(3); 529 LastDNprop.conf.setLong( 530 DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); 531 cluster.restartDataNode(LastDNprop); 532 533 MiniDFSCluster.DataNodeProperties nextToLastDNprop = cluster.stopDataNode(2); 534 nextToLastDNprop.conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); 535 cluster.restartDataNode(nextToLastDNprop); 536 cluster.waitActive(); 537 Thread.sleep(slowHeartbeatDNwaitTime); 538 recommissionNode(0, decommissionedNodeFromANN); 539 540 // Step 3.c, make sure the DN has deleted the block and report to NNs 541 cluster.triggerHeartbeats(); 542 HATestUtil.waitForDNDeletions(cluster); 543 cluster.triggerDeletionReports(); 544 545 // Step 4, decommission the first DN on both ANN and SBN 546 // With the fix to make sure SBN no longer marks excess replica 547 // during recommission, SBN's decommission can finish properly 548 decommissionNode(0, firstDN.getDatanodeUuid(), null, 549 AdminStates.DECOMMISSIONED); 550 551 // Ask SBN to decomm the first DN 552 decommissionNode(1, firstDN.getDatanodeUuid(), null, 553 AdminStates.DECOMMISSIONED); 554 555 cluster.shutdown(); 556 557 } 558 testDecommission(int numNamenodes, int numDatanodes)559 private void testDecommission(int numNamenodes, int numDatanodes) 560 throws IOException { 561 LOG.info("Starting test testDecommission"); 562 startCluster(numNamenodes, numDatanodes, conf); 563 564 ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = 565 new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes); 566 for(int i = 0; i < numNamenodes; i++) { 567 namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes)); 568 } 569 Path file1 = new Path("testDecommission.dat"); 570 for (int iteration = 0; iteration < numDatanodes - 1; iteration++) { 571 int replicas = numDatanodes - iteration - 1; 572 573 // Start decommissioning one namenode at a time 574 for (int i = 0; i < numNamenodes; i++) { 575 ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i); 576 FileSystem fileSys = cluster.getFileSystem(i); 577 FSNamesystem ns = cluster.getNamesystem(i); 578 579 writeFile(fileSys, file1, replicas); 580 581 int deadDecomissioned = ns.getNumDecomDeadDataNodes(); 582 int liveDecomissioned = ns.getNumDecomLiveDataNodes(); 583 584 // Decommission one node. Verify that node is decommissioned. 585 DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes, 586 AdminStates.DECOMMISSIONED); 587 decommissionedNodes.add(decomNode); 588 assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes()); 589 assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes()); 590 591 // Ensure decommissioned datanode is not automatically shutdown 592 DFSClient client = getDfsClient(cluster.getNameNode(i), conf); 593 assertEquals("All datanodes must be alive", numDatanodes, 594 client.datanodeReport(DatanodeReportType.LIVE).length); 595 // wait for the block to be replicated 596 int tries = 0; 597 while (tries++ < 20) { 598 try { 599 Thread.sleep(1000); 600 if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), 601 numDatanodes) == null) { 602 break; 603 } 604 } catch (InterruptedException ie) { 605 } 606 } 607 assertTrue("Checked if block was replicated after decommission, tried " 608 + tries + " times.", tries < 20); 609 cleanupFile(fileSys, file1); 610 } 611 } 612 613 // Restart the cluster and ensure decommissioned datanodes 614 // are allowed to register with the namenode 615 cluster.shutdown(); 616 startCluster(numNamenodes, numDatanodes, conf); 617 cluster.shutdown(); 618 } 619 620 /** 621 * Test that over-replicated blocks are deleted on recommission. 622 */ 623 @Test(timeout=120000) testRecommission()624 public void testRecommission() throws Exception { 625 final int numDatanodes = 6; 626 try { 627 LOG.info("Starting test testRecommission"); 628 629 startCluster(1, numDatanodes, conf); 630 631 final Path file1 = new Path("testDecommission.dat"); 632 final int replicas = numDatanodes - 1; 633 634 ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList(); 635 final FileSystem fileSys = cluster.getFileSystem(); 636 637 // Write a file to n-1 datanodes 638 writeFile(fileSys, file1, replicas); 639 640 // Decommission one of the datanodes with a replica 641 BlockLocation loc = fileSys.getFileBlockLocations(file1, 0, 1)[0]; 642 assertEquals("Unexpected number of replicas from getFileBlockLocations", 643 replicas, loc.getHosts().length); 644 final String toDecomHost = loc.getNames()[0]; 645 String toDecomUuid = null; 646 for (DataNode d : cluster.getDataNodes()) { 647 if (d.getDatanodeId().getXferAddr().equals(toDecomHost)) { 648 toDecomUuid = d.getDatanodeId().getDatanodeUuid(); 649 break; 650 } 651 } 652 assertNotNull("Could not find a dn with the block!", toDecomUuid); 653 final DatanodeInfo decomNode = 654 decommissionNode(0, toDecomUuid, decommissionedNodes, 655 AdminStates.DECOMMISSIONED); 656 decommissionedNodes.add(decomNode); 657 final BlockManager blockManager = 658 cluster.getNamesystem().getBlockManager(); 659 final DatanodeManager datanodeManager = 660 blockManager.getDatanodeManager(); 661 BlockManagerTestUtil.recheckDecommissionState(datanodeManager); 662 663 // Ensure decommissioned datanode is not automatically shutdown 664 DFSClient client = getDfsClient(cluster.getNameNode(), conf); 665 assertEquals("All datanodes must be alive", numDatanodes, 666 client.datanodeReport(DatanodeReportType.LIVE).length); 667 668 // wait for the block to be replicated 669 final ExtendedBlock b = DFSTestUtil.getFirstBlock(fileSys, file1); 670 final String uuid = toDecomUuid; 671 GenericTestUtils.waitFor(new Supplier<Boolean>() { 672 @Override 673 public Boolean get() { 674 BlockInfoContiguous info = 675 blockManager.getStoredBlock(b.getLocalBlock()); 676 int count = 0; 677 StringBuilder sb = new StringBuilder("Replica locations: "); 678 for (int i = 0; i < info.numNodes(); i++) { 679 DatanodeDescriptor dn = info.getDatanode(i); 680 sb.append(dn + ", "); 681 if (!dn.getDatanodeUuid().equals(uuid)) { 682 count++; 683 } 684 } 685 LOG.info(sb.toString()); 686 LOG.info("Count: " + count); 687 return count == replicas; 688 } 689 }, 500, 30000); 690 691 // redecommission and wait for over-replication to be fixed 692 recommissionNode(0, decomNode); 693 BlockManagerTestUtil.recheckDecommissionState(datanodeManager); 694 DFSTestUtil.waitForReplication(cluster, b, 1, replicas, 0); 695 696 cleanupFile(fileSys, file1); 697 } finally { 698 if (cluster != null) { 699 cluster.shutdown(); 700 } 701 } 702 } 703 704 /** 705 * Tests cluster storage statistics during decommissioning for non 706 * federated cluster 707 */ 708 @Test(timeout=360000) testClusterStats()709 public void testClusterStats() throws Exception { 710 testClusterStats(1); 711 } 712 713 /** 714 * Tests cluster storage statistics during decommissioning for 715 * federated cluster 716 */ 717 @Test(timeout=360000) testClusterStatsFederation()718 public void testClusterStatsFederation() throws Exception { 719 testClusterStats(3); 720 } 721 testClusterStats(int numNameNodes)722 public void testClusterStats(int numNameNodes) throws IOException, 723 InterruptedException { 724 LOG.info("Starting test testClusterStats"); 725 int numDatanodes = 1; 726 startCluster(numNameNodes, numDatanodes, conf); 727 728 for (int i = 0; i < numNameNodes; i++) { 729 FileSystem fileSys = cluster.getFileSystem(i); 730 Path file = new Path("testClusterStats.dat"); 731 writeFile(fileSys, file, 1); 732 733 FSNamesystem fsn = cluster.getNamesystem(i); 734 NameNode namenode = cluster.getNameNode(i); 735 736 DatanodeInfo decomInfo = decommissionNode(i, null, null, 737 AdminStates.DECOMMISSION_INPROGRESS); 738 DataNode decomNode = getDataNode(decomInfo); 739 // Check namenode stats for multiple datanode heartbeats 740 verifyStats(namenode, fsn, decomInfo, decomNode, true); 741 742 // Stop decommissioning and verify stats 743 writeConfigFile(excludeFile, null); 744 refreshNodes(fsn, conf); 745 DatanodeInfo retInfo = NameNodeAdapter.getDatanode(fsn, decomInfo); 746 DataNode retNode = getDataNode(decomInfo); 747 waitNodeState(retInfo, AdminStates.NORMAL); 748 verifyStats(namenode, fsn, retInfo, retNode, false); 749 } 750 } 751 getDataNode(DatanodeInfo decomInfo)752 private DataNode getDataNode(DatanodeInfo decomInfo) { 753 DataNode decomNode = null; 754 for (DataNode dn: cluster.getDataNodes()) { 755 if (decomInfo.equals(dn.getDatanodeId())) { 756 decomNode = dn; 757 break; 758 } 759 } 760 assertNotNull("Could not find decomNode in cluster!", decomNode); 761 return decomNode; 762 } 763 764 /** 765 * Test host/include file functionality. Only datanodes 766 * in the include file are allowed to connect to the namenode in a non 767 * federated cluster. 768 */ 769 @Test(timeout=360000) testHostsFile()770 public void testHostsFile() throws IOException, InterruptedException { 771 // Test for a single namenode cluster 772 testHostsFile(1); 773 } 774 775 /** 776 * Test host/include file functionality. Only datanodes 777 * in the include file are allowed to connect to the namenode in a 778 * federated cluster. 779 */ 780 @Test(timeout=360000) testHostsFileFederation()781 public void testHostsFileFederation() throws IOException, InterruptedException { 782 // Test for 3 namenode federated cluster 783 testHostsFile(3); 784 } 785 testHostsFile(int numNameNodes)786 public void testHostsFile(int numNameNodes) throws IOException, 787 InterruptedException { 788 int numDatanodes = 1; 789 cluster = new MiniDFSCluster.Builder(conf) 790 .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes)) 791 .numDataNodes(numDatanodes).setupHostsFile(true).build(); 792 cluster.waitActive(); 793 794 // Now empty hosts file and ensure the datanode is disallowed 795 // from talking to namenode, resulting in it's shutdown. 796 ArrayList<String>list = new ArrayList<String>(); 797 final String bogusIp = "127.0.30.1"; 798 list.add(bogusIp); 799 writeConfigFile(hostsFile, list); 800 801 for (int j = 0; j < numNameNodes; j++) { 802 refreshNodes(cluster.getNamesystem(j), conf); 803 804 DFSClient client = getDfsClient(cluster.getNameNode(j), conf); 805 DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); 806 for (int i = 0 ; i < 5 && info.length != 0; i++) { 807 LOG.info("Waiting for datanode to be marked dead"); 808 Thread.sleep(HEARTBEAT_INTERVAL * 1000); 809 info = client.datanodeReport(DatanodeReportType.LIVE); 810 } 811 assertEquals("Number of live nodes should be 0", 0, info.length); 812 813 // Test that bogus hostnames are considered "dead". 814 // The dead report should have an entry for the bogus entry in the hosts 815 // file. The original datanode is excluded from the report because it 816 // is no longer in the included list. 817 info = client.datanodeReport(DatanodeReportType.DEAD); 818 assertEquals("There should be 1 dead node", 1, info.length); 819 assertEquals(bogusIp, info[0].getHostName()); 820 } 821 } 822 823 @Test(timeout=120000) testDecommissionWithOpenfile()824 public void testDecommissionWithOpenfile() throws IOException, InterruptedException { 825 LOG.info("Starting test testDecommissionWithOpenfile"); 826 827 //At most 4 nodes will be decommissioned 828 startCluster(1, 7, conf); 829 830 FileSystem fileSys = cluster.getFileSystem(0); 831 FSNamesystem ns = cluster.getNamesystem(0); 832 833 String openFile = "/testDecommissionWithOpenfile.dat"; 834 835 writeFile(fileSys, new Path(openFile), (short)3); 836 // make sure the file was open for write 837 FSDataOutputStream fdos = fileSys.append(new Path(openFile)); 838 839 LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(cluster.getNameNode(0), openFile, 0, fileSize); 840 841 DatanodeInfo[] dnInfos4LastBlock = lbs.getLastLocatedBlock().getLocations(); 842 DatanodeInfo[] dnInfos4FirstBlock = lbs.get(0).getLocations(); 843 844 ArrayList<String> nodes = new ArrayList<String>(); 845 ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>(); 846 847 DatanodeManager dm = ns.getBlockManager().getDatanodeManager(); 848 for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) { 849 DatanodeInfo found = datanodeInfo; 850 for (DatanodeInfo dif: dnInfos4LastBlock) { 851 if (datanodeInfo.equals(dif)) { 852 found = null; 853 } 854 } 855 if (found != null) { 856 nodes.add(found.getXferAddr()); 857 dnInfos.add(dm.getDatanode(found)); 858 } 859 } 860 //decommission one of the 3 nodes which have last block 861 nodes.add(dnInfos4LastBlock[0].getXferAddr()); 862 dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0])); 863 864 writeConfigFile(excludeFile, nodes); 865 refreshNodes(ns, conf); 866 for (DatanodeInfo dn : dnInfos) { 867 waitNodeState(dn, AdminStates.DECOMMISSIONED); 868 } 869 870 fdos.close(); 871 } 872 873 /** 874 * Tests restart of namenode while datanode hosts are added to exclude file 875 **/ 876 @Test(timeout=360000) testDecommissionWithNamenodeRestart()877 public void testDecommissionWithNamenodeRestart()throws IOException, InterruptedException { 878 LOG.info("Starting test testDecommissionWithNamenodeRestart"); 879 int numNamenodes = 1; 880 int numDatanodes = 1; 881 int replicas = 1; 882 conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 883 DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); 884 conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 5); 885 886 startCluster(numNamenodes, numDatanodes, conf); 887 Path file1 = new Path("testDecommissionWithNamenodeRestart.dat"); 888 FileSystem fileSys = cluster.getFileSystem(); 889 writeFile(fileSys, file1, replicas); 890 891 DFSClient client = getDfsClient(cluster.getNameNode(), conf); 892 DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE); 893 DatanodeID excludedDatanodeID = info[0]; 894 String excludedDatanodeName = info[0].getXferAddr(); 895 896 writeConfigFile(excludeFile, new ArrayList<String>(Arrays.asList(excludedDatanodeName))); 897 898 //Add a new datanode to cluster 899 cluster.startDataNodes(conf, 1, true, null, null, null, null); 900 numDatanodes+=1; 901 902 assertEquals("Number of datanodes should be 2 ", 2, cluster.getDataNodes().size()); 903 //Restart the namenode 904 cluster.restartNameNode(); 905 DatanodeInfo datanodeInfo = NameNodeAdapter.getDatanode( 906 cluster.getNamesystem(), excludedDatanodeID); 907 waitNodeState(datanodeInfo, AdminStates.DECOMMISSIONED); 908 909 // Ensure decommissioned datanode is not automatically shutdown 910 assertEquals("All datanodes must be alive", numDatanodes, 911 client.datanodeReport(DatanodeReportType.LIVE).length); 912 assertTrue("Checked if block was replicated after decommission.", 913 checkFile(fileSys, file1, replicas, datanodeInfo.getXferAddr(), 914 numDatanodes) == null); 915 916 cleanupFile(fileSys, file1); 917 // Restart the cluster and ensure recommissioned datanodes 918 // are allowed to register with the namenode 919 cluster.shutdown(); 920 startCluster(numNamenodes, numDatanodes, conf); 921 cluster.shutdown(); 922 } 923 924 /** 925 * Test using a "registration name" in a host include file. 926 * 927 * Registration names are DataNode names specified in the configuration by 928 * dfs.datanode.hostname. The DataNode will send this name to the NameNode 929 * as part of its registration. Registration names are helpful when you 930 * want to override the normal first result of DNS resolution on the 931 * NameNode. For example, a given datanode IP may map to two hostnames, 932 * and you may want to choose which hostname is used internally in the 933 * cluster. 934 * 935 * It is not recommended to use a registration name which is not also a 936 * valid DNS hostname for the DataNode. See HDFS-5237 for background. 937 */ 938 @Ignore 939 @Test(timeout=360000) testIncludeByRegistrationName()940 public void testIncludeByRegistrationName() throws Exception { 941 Configuration hdfsConf = new Configuration(conf); 942 // Any IPv4 address starting with 127 functions as a "loopback" address 943 // which is connected to the current host. So by choosing 127.0.0.100 944 // as our registration name, we have chosen a name which is also a valid 945 // way of reaching the local DataNode we're going to start. 946 // Typically, a registration name would be a hostname, but we don't want 947 // to deal with DNS in this test. 948 final String registrationName = "127.0.0.100"; 949 final String nonExistentDn = "127.0.0.10"; 950 hdfsConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName); 951 cluster = new MiniDFSCluster.Builder(hdfsConf) 952 .numDataNodes(1).checkDataNodeHostConfig(true) 953 .setupHostsFile(true).build(); 954 cluster.waitActive(); 955 956 // Set up an includes file that doesn't have our datanode. 957 ArrayList<String> nodes = new ArrayList<String>(); 958 nodes.add(nonExistentDn); 959 writeConfigFile(hostsFile, nodes); 960 refreshNodes(cluster.getNamesystem(0), hdfsConf); 961 962 // Wait for the DN to be marked dead. 963 LOG.info("Waiting for DN to be marked as dead."); 964 final DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf); 965 GenericTestUtils.waitFor(new Supplier<Boolean>() { 966 @Override 967 public Boolean get() { 968 BlockManagerTestUtil 969 .checkHeartbeat(cluster.getNamesystem().getBlockManager()); 970 try { 971 DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD); 972 return info.length == 1; 973 } catch (IOException e) { 974 LOG.warn("Failed to check dead DNs", e); 975 return false; 976 } 977 } 978 }, 500, 5000); 979 980 // Use a non-empty include file with our registration name. 981 // It should work. 982 int dnPort = cluster.getDataNodes().get(0).getXferPort(); 983 nodes = new ArrayList<String>(); 984 nodes.add(registrationName + ":" + dnPort); 985 writeConfigFile(hostsFile, nodes); 986 refreshNodes(cluster.getNamesystem(0), hdfsConf); 987 cluster.restartDataNode(0); 988 cluster.triggerHeartbeats(); 989 990 // Wait for the DN to come back. 991 LOG.info("Waiting for DN to come back."); 992 GenericTestUtils.waitFor(new Supplier<Boolean>() { 993 @Override 994 public Boolean get() { 995 BlockManagerTestUtil 996 .checkHeartbeat(cluster.getNamesystem().getBlockManager()); 997 try { 998 DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE); 999 if (info.length == 1) { 1000 Assert.assertFalse(info[0].isDecommissioned()); 1001 Assert.assertFalse(info[0].isDecommissionInProgress()); 1002 assertEquals(registrationName, info[0].getHostName()); 1003 return true; 1004 } 1005 } catch (IOException e) { 1006 LOG.warn("Failed to check dead DNs", e); 1007 } 1008 return false; 1009 } 1010 }, 500, 5000); 1011 } 1012 1013 @Test(timeout=120000) testBlocksPerInterval()1014 public void testBlocksPerInterval() throws Exception { 1015 Configuration newConf = new Configuration(conf); 1016 org.apache.log4j.Logger.getLogger(DecommissionManager.class) 1017 .setLevel(Level.TRACE); 1018 // Turn the blocks per interval way down 1019 newConf.setInt( 1020 DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY, 1021 3); 1022 // Disable the normal monitor runs 1023 newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1024 Integer.MAX_VALUE); 1025 startCluster(1, 3, newConf); 1026 final FileSystem fs = cluster.getFileSystem(); 1027 final DatanodeManager datanodeManager = 1028 cluster.getNamesystem().getBlockManager().getDatanodeManager(); 1029 final DecommissionManager decomManager = datanodeManager.getDecomManager(); 1030 1031 // Write a 3 block file, so each node has one block. Should scan 3 nodes. 1032 DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA); 1033 doDecomCheck(datanodeManager, decomManager, 3); 1034 // Write another file, should only scan two 1035 DFSTestUtil.createFile(fs, new Path("/file2"), 64, (short)3, 0xBAD1DEA); 1036 doDecomCheck(datanodeManager, decomManager, 2); 1037 // One more file, should only scan 1 1038 DFSTestUtil.createFile(fs, new Path("/file3"), 64, (short)3, 0xBAD1DEA); 1039 doDecomCheck(datanodeManager, decomManager, 1); 1040 // blocks on each DN now exceeds limit, still scan at least one node 1041 DFSTestUtil.createFile(fs, new Path("/file4"), 64, (short)3, 0xBAD1DEA); 1042 doDecomCheck(datanodeManager, decomManager, 1); 1043 } 1044 1045 @Deprecated 1046 @Test(timeout=120000) testNodesPerInterval()1047 public void testNodesPerInterval() throws Exception { 1048 Configuration newConf = new Configuration(conf); 1049 org.apache.log4j.Logger.getLogger(DecommissionManager.class) 1050 .setLevel(Level.TRACE); 1051 // Set the deprecated configuration key which limits the # of nodes per 1052 // interval 1053 newConf.setInt("dfs.namenode.decommission.nodes.per.interval", 1); 1054 // Disable the normal monitor runs 1055 newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1056 Integer.MAX_VALUE); 1057 startCluster(1, 3, newConf); 1058 final FileSystem fs = cluster.getFileSystem(); 1059 final DatanodeManager datanodeManager = 1060 cluster.getNamesystem().getBlockManager().getDatanodeManager(); 1061 final DecommissionManager decomManager = datanodeManager.getDecomManager(); 1062 1063 // Write a 3 block file, so each node has one block. Should scan 1 node 1064 // each time. 1065 DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA); 1066 for (int i=0; i<3; i++) { 1067 doDecomCheck(datanodeManager, decomManager, 1); 1068 } 1069 } 1070 doDecomCheck(DatanodeManager datanodeManager, DecommissionManager decomManager, int expectedNumCheckedNodes)1071 private void doDecomCheck(DatanodeManager datanodeManager, 1072 DecommissionManager decomManager, int expectedNumCheckedNodes) 1073 throws IOException, ExecutionException, InterruptedException { 1074 // Decom all nodes 1075 ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList(); 1076 for (DataNode d: cluster.getDataNodes()) { 1077 DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), 1078 decommissionedNodes, 1079 AdminStates.DECOMMISSION_INPROGRESS); 1080 decommissionedNodes.add(dn); 1081 } 1082 // Run decom scan and check 1083 BlockManagerTestUtil.recheckDecommissionState(datanodeManager); 1084 assertEquals("Unexpected # of nodes checked", expectedNumCheckedNodes, 1085 decomManager.getNumNodesChecked()); 1086 // Recommission all nodes 1087 for (DatanodeInfo dn : decommissionedNodes) { 1088 recommissionNode(0, dn); 1089 } 1090 } 1091 1092 @Test(timeout=120000) testPendingNodes()1093 public void testPendingNodes() throws Exception { 1094 Configuration newConf = new Configuration(conf); 1095 org.apache.log4j.Logger.getLogger(DecommissionManager.class) 1096 .setLevel(Level.TRACE); 1097 // Only allow one node to be decom'd at a time 1098 newConf.setInt( 1099 DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, 1100 1); 1101 // Disable the normal monitor runs 1102 newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1103 Integer.MAX_VALUE); 1104 startCluster(1, 3, newConf); 1105 final FileSystem fs = cluster.getFileSystem(); 1106 final DatanodeManager datanodeManager = 1107 cluster.getNamesystem().getBlockManager().getDatanodeManager(); 1108 final DecommissionManager decomManager = datanodeManager.getDecomManager(); 1109 1110 // Keep a file open to prevent decom from progressing 1111 HdfsDataOutputStream open1 = 1112 (HdfsDataOutputStream) fs.create(new Path("/openFile1"), (short)3); 1113 // Flush and trigger block reports so the block definitely shows up on NN 1114 open1.write(123); 1115 open1.hflush(); 1116 for (DataNode d: cluster.getDataNodes()) { 1117 DataNodeTestUtils.triggerBlockReport(d); 1118 } 1119 // Decom two nodes, so one is still alive 1120 ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList(); 1121 for (int i=0; i<2; i++) { 1122 final DataNode d = cluster.getDataNodes().get(i); 1123 DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), 1124 decommissionedNodes, 1125 AdminStates.DECOMMISSION_INPROGRESS); 1126 decommissionedNodes.add(dn); 1127 } 1128 1129 for (int i=2; i>=0; i--) { 1130 assertTrackedAndPending(decomManager, 0, i); 1131 BlockManagerTestUtil.recheckDecommissionState(datanodeManager); 1132 } 1133 1134 // Close file, try to decom the last node, should get stuck in tracked 1135 open1.close(); 1136 final DataNode d = cluster.getDataNodes().get(2); 1137 DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), 1138 decommissionedNodes, 1139 AdminStates.DECOMMISSION_INPROGRESS); 1140 decommissionedNodes.add(dn); 1141 BlockManagerTestUtil.recheckDecommissionState(datanodeManager); 1142 1143 assertTrackedAndPending(decomManager, 1, 0); 1144 } 1145 assertTrackedAndPending(DecommissionManager decomManager, int tracked, int pending)1146 private void assertTrackedAndPending(DecommissionManager decomManager, 1147 int tracked, int pending) { 1148 assertEquals("Unexpected number of tracked nodes", tracked, 1149 decomManager.getNumTrackedNodes()); 1150 assertEquals("Unexpected number of pending nodes", pending, 1151 decomManager.getNumPendingNodes()); 1152 } 1153 } 1154