1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
24 
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Random;
32 import java.util.concurrent.ExecutionException;
33 
34 import com.google.common.base.Supplier;
35 import com.google.common.collect.Lists;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.BlockLocation;
38 import org.apache.hadoop.fs.CommonConfigurationKeys;
39 import org.apache.hadoop.fs.FSDataOutputStream;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
43 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
44 import org.apache.hadoop.hdfs.protocol.DatanodeID;
45 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
46 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
47 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
48 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
49 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
50 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
51 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
52 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
53 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
54 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
55 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
56 import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
57 import org.apache.hadoop.hdfs.server.datanode.DataNode;
58 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
59 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
60 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
61 import org.apache.hadoop.hdfs.server.namenode.NameNode;
62 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
63 import org.apache.hadoop.test.GenericTestUtils;
64 import org.apache.hadoop.test.PathUtils;
65 import org.apache.log4j.Level;
66 import org.junit.After;
67 import org.junit.Assert;
68 import org.junit.Before;
69 import org.junit.Ignore;
70 import org.junit.Test;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73 
74 /**
75  * This class tests the decommissioning of nodes.
76  */
77 public class TestDecommission {
78   public static final Logger LOG = LoggerFactory.getLogger(TestDecommission
79       .class);
80   static final long seed = 0xDEADBEEFL;
81   static final int blockSize = 8192;
82   static final int fileSize = 16384;
83   static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
84   static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec
85   static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval
86 
87   final Random myrand = new Random();
88   Path dir;
89   Path hostsFile;
90   Path excludeFile;
91   FileSystem localFileSys;
92   Configuration conf;
93   MiniDFSCluster cluster = null;
94 
95   @Before
setup()96   public void setup() throws IOException {
97     conf = new HdfsConfiguration();
98     // Set up the hosts/exclude files.
99     localFileSys = FileSystem.getLocal(conf);
100     Path workingDir = localFileSys.getWorkingDirectory();
101     dir = new Path(workingDir, PathUtils.getTestDirName(getClass()) + "/work-dir/decommission");
102     hostsFile = new Path(dir, "hosts");
103     excludeFile = new Path(dir, "exclude");
104 
105     // Setup conf
106     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
107     conf.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
108     conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
109     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
110     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
111     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
112     conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
113     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
114     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
115 
116     writeConfigFile(hostsFile, null);
117     writeConfigFile(excludeFile, null);
118   }
119 
120   @After
teardown()121   public void teardown() throws IOException {
122     cleanupFile(localFileSys, dir);
123     if (cluster != null) {
124       cluster.shutdown();
125     }
126   }
127 
writeConfigFile(Path name, List<String> nodes)128   private void writeConfigFile(Path name, List<String> nodes)
129     throws IOException {
130     // delete if it already exists
131     if (localFileSys.exists(name)) {
132       localFileSys.delete(name, true);
133     }
134 
135     FSDataOutputStream stm = localFileSys.create(name);
136 
137     if (nodes != null) {
138       for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
139         String node = it.next();
140         stm.writeBytes(node);
141         stm.writeBytes("\n");
142       }
143     }
144     stm.close();
145   }
146 
writeFile(FileSystem fileSys, Path name, int repl)147   private void writeFile(FileSystem fileSys, Path name, int repl)
148     throws IOException {
149     // create and write a file that contains three blocks of data
150     FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
151         .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
152         (short) repl, blockSize);
153     byte[] buffer = new byte[fileSize];
154     Random rand = new Random(seed);
155     rand.nextBytes(buffer);
156     stm.write(buffer);
157     stm.close();
158     LOG.info("Created file " + name + " with " + repl + " replicas.");
159   }
160 
161   /**
162    * Verify that the number of replicas are as expected for each block in
163    * the given file.
164    * For blocks with a decommissioned node, verify that their replication
165    * is 1 more than what is specified.
166    * For blocks without decommissioned nodes, verify their replication is
167    * equal to what is specified.
168    *
169    * @param downnode - if null, there is no decommissioned node for this file.
170    * @return - null if no failure found, else an error message string.
171    */
checkFile(FileSystem fileSys, Path name, int repl, String downnode, int numDatanodes)172   private static String checkFile(FileSystem fileSys, Path name, int repl,
173     String downnode, int numDatanodes) throws IOException {
174     boolean isNodeDown = (downnode != null);
175     // need a raw stream
176     assertTrue("Not HDFS:"+fileSys.getUri(),
177         fileSys instanceof DistributedFileSystem);
178     HdfsDataInputStream dis = (HdfsDataInputStream)
179         fileSys.open(name);
180     Collection<LocatedBlock> dinfo = dis.getAllBlocks();
181     for (LocatedBlock blk : dinfo) { // for each block
182       int hasdown = 0;
183       DatanodeInfo[] nodes = blk.getLocations();
184       for (int j = 0; j < nodes.length; j++) { // for each replica
185         if (isNodeDown && nodes[j].getXferAddr().equals(downnode)) {
186           hasdown++;
187           //Downnode must actually be decommissioned
188           if (!nodes[j].isDecommissioned()) {
189             return "For block " + blk.getBlock() + " replica on " +
190               nodes[j] + " is given as downnode, " +
191               "but is not decommissioned";
192           }
193           //Decommissioned node (if any) should only be last node in list.
194           if (j != nodes.length - 1) {
195             return "For block " + blk.getBlock() + " decommissioned node "
196               + nodes[j] + " was not last node in list: "
197               + (j + 1) + " of " + nodes.length;
198           }
199           LOG.info("Block " + blk.getBlock() + " replica on " +
200             nodes[j] + " is decommissioned.");
201         } else {
202           //Non-downnodes must not be decommissioned
203           if (nodes[j].isDecommissioned()) {
204             return "For block " + blk.getBlock() + " replica on " +
205               nodes[j] + " is unexpectedly decommissioned";
206           }
207         }
208       }
209 
210       LOG.info("Block " + blk.getBlock() + " has " + hasdown
211         + " decommissioned replica.");
212       if(Math.min(numDatanodes, repl+hasdown) != nodes.length) {
213         return "Wrong number of replicas for block " + blk.getBlock() +
214           ": " + nodes.length + ", expected " +
215           Math.min(numDatanodes, repl+hasdown);
216       }
217     }
218     return null;
219   }
220 
cleanupFile(FileSystem fileSys, Path name)221   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
222     assertTrue(fileSys.exists(name));
223     fileSys.delete(name, true);
224     assertTrue(!fileSys.exists(name));
225   }
226 
227   /*
228    * decommission the DN at index dnIndex or one random node if dnIndex is set
229    * to -1 and wait for the node to reach the given {@code waitForState}.
230    */
decommissionNode(int nnIndex, String datanodeUuid, ArrayList<DatanodeInfo>decommissionedNodes, AdminStates waitForState)231   private DatanodeInfo decommissionNode(int nnIndex,
232                                   String datanodeUuid,
233                                   ArrayList<DatanodeInfo>decommissionedNodes,
234                                   AdminStates waitForState)
235     throws IOException {
236     DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf);
237     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
238 
239     //
240     // pick one datanode randomly unless the caller specifies one.
241     //
242     int index = 0;
243     if (datanodeUuid == null) {
244       boolean found = false;
245       while (!found) {
246         index = myrand.nextInt(info.length);
247         if (!info[index].isDecommissioned()) {
248           found = true;
249         }
250       }
251     } else {
252       // The caller specifies a DN
253       for (; index < info.length; index++) {
254         if (info[index].getDatanodeUuid().equals(datanodeUuid)) {
255           break;
256         }
257       }
258       if (index == info.length) {
259         throw new IOException("invalid datanodeUuid " + datanodeUuid);
260       }
261     }
262     String nodename = info[index].getXferAddr();
263     LOG.info("Decommissioning node: " + nodename);
264 
265     // write nodename into the exclude file.
266     ArrayList<String> nodes = new ArrayList<String>();
267     if (decommissionedNodes != null) {
268       for (DatanodeInfo dn : decommissionedNodes) {
269         nodes.add(dn.getName());
270       }
271     }
272     nodes.add(nodename);
273     writeConfigFile(excludeFile, nodes);
274     refreshNodes(cluster.getNamesystem(nnIndex), conf);
275     DatanodeInfo ret = NameNodeAdapter.getDatanode(
276         cluster.getNamesystem(nnIndex), info[index]);
277     waitNodeState(ret, waitForState);
278     return ret;
279   }
280 
281   /* Ask a specific NN to stop decommission of the datanode and wait for each
282    * to reach the NORMAL state.
283    */
recommissionNode(int nnIndex, DatanodeInfo decommissionedNode)284   private void recommissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
285     LOG.info("Recommissioning node: " + decommissionedNode);
286     writeConfigFile(excludeFile, null);
287     refreshNodes(cluster.getNamesystem(nnIndex), conf);
288     waitNodeState(decommissionedNode, AdminStates.NORMAL);
289 
290   }
291 
292   /*
293    * Wait till node is fully decommissioned.
294    */
waitNodeState(DatanodeInfo node, AdminStates state)295   private void waitNodeState(DatanodeInfo node,
296                              AdminStates state) {
297     boolean done = state == node.getAdminState();
298     while (!done) {
299       LOG.info("Waiting for node " + node + " to change state to "
300           + state + " current state: " + node.getAdminState());
301       try {
302         Thread.sleep(HEARTBEAT_INTERVAL * 500);
303       } catch (InterruptedException e) {
304         // nothing
305       }
306       done = state == node.getAdminState();
307     }
308     LOG.info("node " + node + " reached the state " + state);
309   }
310 
311   /* Get DFSClient to the namenode */
getDfsClient(NameNode nn, Configuration conf)312   private static DFSClient getDfsClient(NameNode nn,
313       Configuration conf) throws IOException {
314     return new DFSClient(nn.getNameNodeAddress(), conf);
315   }
316 
317   /* Validate cluster has expected number of datanodes */
validateCluster(DFSClient client, int numDNs)318   private static void validateCluster(DFSClient client, int numDNs)
319       throws IOException {
320     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
321     assertEquals("Number of Datanodes ", numDNs, info.length);
322   }
323 
324   /** Start a MiniDFSCluster
325    * @throws IOException */
startCluster(int numNameNodes, int numDatanodes, Configuration conf)326   private void startCluster(int numNameNodes, int numDatanodes,
327       Configuration conf) throws IOException {
328     cluster = new MiniDFSCluster.Builder(conf)
329       .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
330         .numDataNodes(numDatanodes).build();
331     cluster.waitActive();
332     for (int i = 0; i < numNameNodes; i++) {
333       DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
334       validateCluster(client, numDatanodes);
335     }
336   }
337 
refreshNodes(final FSNamesystem ns, final Configuration conf )338   static void refreshNodes(final FSNamesystem ns, final Configuration conf
339       ) throws IOException {
340     ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
341   }
342 
verifyStats(NameNode namenode, FSNamesystem fsn, DatanodeInfo info, DataNode node, boolean decommissioning)343   private void verifyStats(NameNode namenode, FSNamesystem fsn,
344       DatanodeInfo info, DataNode node, boolean decommissioning)
345       throws InterruptedException, IOException {
346     // Do the stats check over 10 heartbeats
347     for (int i = 0; i < 10; i++) {
348       long[] newStats = namenode.getRpcServer().getStats();
349 
350       // For decommissioning nodes, ensure capacity of the DN is no longer
351       // counted. Only used space of the DN is counted in cluster capacity
352       assertEquals(newStats[0],
353           decommissioning ? info.getDfsUsed() : info.getCapacity());
354 
355       // Ensure cluster used capacity is counted for both normal and
356       // decommissioning nodes
357       assertEquals(newStats[1], info.getDfsUsed());
358 
359       // For decommissioning nodes, remaining space from the DN is not counted
360       assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining());
361 
362       // Ensure transceiver count is same as that DN
363       assertEquals(fsn.getTotalLoad(), info.getXceiverCount());
364       DataNodeTestUtils.triggerHeartbeat(node);
365     }
366   }
367 
368   /**
369    * Tests decommission for non federated cluster
370    */
371   @Test(timeout=360000)
testDecommission()372   public void testDecommission() throws IOException {
373     testDecommission(1, 6);
374   }
375 
376   /**
377    * Tests decommission with replicas on the target datanode cannot be migrated
378    * to other datanodes and satisfy the replication factor. Make sure the
379    * datanode won't get stuck in decommissioning state.
380    */
381   @Test(timeout = 360000)
testDecommission2()382   public void testDecommission2() throws IOException {
383     LOG.info("Starting test testDecommission");
384     int numNamenodes = 1;
385     int numDatanodes = 4;
386     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
387     startCluster(numNamenodes, numDatanodes, conf);
388 
389     ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>(
390         numNamenodes);
391     namenodeDecomList.add(0, new ArrayList<DatanodeInfo>(numDatanodes));
392 
393     Path file1 = new Path("testDecommission2.dat");
394     int replicas = 4;
395 
396     // Start decommissioning one namenode at a time
397     ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(0);
398     FileSystem fileSys = cluster.getFileSystem(0);
399     FSNamesystem ns = cluster.getNamesystem(0);
400 
401     writeFile(fileSys, file1, replicas);
402 
403     int deadDecomissioned = ns.getNumDecomDeadDataNodes();
404     int liveDecomissioned = ns.getNumDecomLiveDataNodes();
405 
406     // Decommission one node. Verify that node is decommissioned.
407     DatanodeInfo decomNode = decommissionNode(0, null, decommissionedNodes,
408         AdminStates.DECOMMISSIONED);
409     decommissionedNodes.add(decomNode);
410     assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes());
411     assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes());
412 
413     // Ensure decommissioned datanode is not automatically shutdown
414     DFSClient client = getDfsClient(cluster.getNameNode(0), conf);
415     assertEquals("All datanodes must be alive", numDatanodes,
416         client.datanodeReport(DatanodeReportType.LIVE).length);
417     assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
418         numDatanodes));
419     cleanupFile(fileSys, file1);
420 
421     // Restart the cluster and ensure recommissioned datanodes
422     // are allowed to register with the namenode
423     cluster.shutdown();
424     startCluster(1, 4, conf);
425     cluster.shutdown();
426   }
427 
428   /**
429    * Test decommission for federeated cluster
430    */
431   @Test(timeout=360000)
testDecommissionFederation()432   public void testDecommissionFederation() throws IOException {
433     testDecommission(2, 2);
434   }
435 
436   /**
437    * Test decommission process on standby NN.
438    * Verify admins can run "dfsadmin -refreshNodes" on SBN and decomm
439    * process can finish as long as admins run "dfsadmin -refreshNodes"
440    * on active NN.
441    * SBN used to mark excess replica upon recommission. The SBN's pick
442    * for excess replica could be different from the one picked by ANN.
443    * That creates inconsistent state and prevent SBN from finishing
444    * decommission.
445    */
446   @Test(timeout=360000)
testDecommissionOnStandby()447   public void testDecommissionOnStandby() throws Exception {
448     Configuration hdfsConf = new HdfsConfiguration(conf);
449     hdfsConf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
450     hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 30000);
451     hdfsConf.setInt(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, 2);
452 
453     // The time to wait so that the slow DN's heartbeat is considered old
454     // by BlockPlacementPolicyDefault and thus will choose that DN for
455     // excess replica.
456     long slowHeartbeatDNwaitTime =
457         hdfsConf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
458         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000 * (hdfsConf.getInt(
459         DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
460         DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT) + 1);
461 
462     cluster = new MiniDFSCluster.Builder(hdfsConf)
463         .nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(3).build();
464 
465     cluster.transitionToActive(0);
466     cluster.waitActive();
467 
468 
469     // Step 1, create a cluster with 4 DNs. Blocks are stored on the first 3 DNs.
470     // The last DN is empty. Also configure the last DN to have slow heartbeat
471     // so that it will be chosen as excess replica candidate during recommission.
472 
473     // Step 1.a, copy blocks to the first 3 DNs. Given the replica count is the
474     // same as # of DNs, each DN will have a replica for any block.
475     Path file1 = new Path("testDecommissionHA.dat");
476     int replicas = 3;
477     FileSystem activeFileSys = cluster.getFileSystem(0);
478     writeFile(activeFileSys, file1, replicas);
479 
480     HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
481         cluster.getNameNode(1));
482 
483     // Step 1.b, start a DN with slow heartbeat, so that we can know for sure it
484     // will be chosen as the target of excess replica during recommission.
485     hdfsConf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
486     cluster.startDataNodes(hdfsConf, 1, true, null, null, null);
487     DataNode lastDN = cluster.getDataNodes().get(3);
488     lastDN.getDatanodeUuid();
489 
490     // Step 2, decommission the first DN at both ANN and SBN.
491     DataNode firstDN = cluster.getDataNodes().get(0);
492 
493     // Step 2.a, ask ANN to decomm the first DN
494     DatanodeInfo decommissionedNodeFromANN = decommissionNode(
495         0, firstDN.getDatanodeUuid(), null, AdminStates.DECOMMISSIONED);
496 
497     // Step 2.b, ask SBN to decomm the first DN
498     DatanodeInfo decomNodeFromSBN = decommissionNode(1, firstDN.getDatanodeUuid(), null,
499         AdminStates.DECOMMISSIONED);
500 
501     // Step 3, recommission the first DN on SBN and ANN to create excess replica
502     // It recommissions the node on SBN first to create potential
503     // inconsistent state. In production cluster, such insistent state can happen
504     // even if recommission command was issued on ANN first given the async nature
505     // of the system.
506 
507     // Step 3.a, ask SBN to recomm the first DN.
508     // SBN has been fixed so that it no longer invalidates excess replica during
509     // recommission.
510     // Before the fix, SBN could get into the following state.
511     //    1. the last DN would have been chosen as excess replica, given its
512     //    heartbeat is considered old.
513     //    Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
514     //    2. After recommissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
515     //    and one excess replica ( 3 )
516     // After the fix,
517     //    After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
518     Thread.sleep(slowHeartbeatDNwaitTime);
519     recommissionNode(1, decomNodeFromSBN);
520 
521     // Step 3.b, ask ANN to recommission the first DN.
522     // To verify the fix, the test makes sure the excess replica picked by ANN
523     // is different from the one picked by SBN before the fix.
524     // To achieve that, we make sure next-to-last DN is chosen as excess replica
525     // by ANN.
526     // 1. restore LastDNprop's heartbeat interval.
527     // 2. Make next-to-last DN's heartbeat slow.
528     MiniDFSCluster.DataNodeProperties LastDNprop = cluster.stopDataNode(3);
529     LastDNprop.conf.setLong(
530         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
531     cluster.restartDataNode(LastDNprop);
532 
533     MiniDFSCluster.DataNodeProperties nextToLastDNprop = cluster.stopDataNode(2);
534     nextToLastDNprop.conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
535     cluster.restartDataNode(nextToLastDNprop);
536     cluster.waitActive();
537     Thread.sleep(slowHeartbeatDNwaitTime);
538     recommissionNode(0, decommissionedNodeFromANN);
539 
540     // Step 3.c, make sure the DN has deleted the block and report to NNs
541     cluster.triggerHeartbeats();
542     HATestUtil.waitForDNDeletions(cluster);
543     cluster.triggerDeletionReports();
544 
545     // Step 4, decommission the first DN on both ANN and SBN
546     // With the fix to make sure SBN no longer marks excess replica
547     // during recommission, SBN's decommission can finish properly
548     decommissionNode(0, firstDN.getDatanodeUuid(), null,
549         AdminStates.DECOMMISSIONED);
550 
551     // Ask SBN to decomm the first DN
552     decommissionNode(1, firstDN.getDatanodeUuid(), null,
553         AdminStates.DECOMMISSIONED);
554 
555     cluster.shutdown();
556 
557   }
558 
testDecommission(int numNamenodes, int numDatanodes)559   private void testDecommission(int numNamenodes, int numDatanodes)
560       throws IOException {
561     LOG.info("Starting test testDecommission");
562     startCluster(numNamenodes, numDatanodes, conf);
563 
564     ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList =
565       new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
566     for(int i = 0; i < numNamenodes; i++) {
567       namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
568     }
569     Path file1 = new Path("testDecommission.dat");
570     for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
571       int replicas = numDatanodes - iteration - 1;
572 
573       // Start decommissioning one namenode at a time
574       for (int i = 0; i < numNamenodes; i++) {
575         ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
576         FileSystem fileSys = cluster.getFileSystem(i);
577         FSNamesystem ns = cluster.getNamesystem(i);
578 
579         writeFile(fileSys, file1, replicas);
580 
581         int deadDecomissioned = ns.getNumDecomDeadDataNodes();
582         int liveDecomissioned = ns.getNumDecomLiveDataNodes();
583 
584         // Decommission one node. Verify that node is decommissioned.
585         DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes,
586             AdminStates.DECOMMISSIONED);
587         decommissionedNodes.add(decomNode);
588         assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes());
589         assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes());
590 
591         // Ensure decommissioned datanode is not automatically shutdown
592         DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
593         assertEquals("All datanodes must be alive", numDatanodes,
594             client.datanodeReport(DatanodeReportType.LIVE).length);
595         // wait for the block to be replicated
596         int tries = 0;
597         while (tries++ < 20) {
598           try {
599             Thread.sleep(1000);
600             if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
601                 numDatanodes) == null) {
602               break;
603             }
604           } catch (InterruptedException ie) {
605           }
606         }
607         assertTrue("Checked if block was replicated after decommission, tried "
608             + tries + " times.", tries < 20);
609         cleanupFile(fileSys, file1);
610       }
611     }
612 
613     // Restart the cluster and ensure decommissioned datanodes
614     // are allowed to register with the namenode
615     cluster.shutdown();
616     startCluster(numNamenodes, numDatanodes, conf);
617     cluster.shutdown();
618   }
619 
620   /**
621    * Test that over-replicated blocks are deleted on recommission.
622    */
623   @Test(timeout=120000)
testRecommission()624   public void testRecommission() throws Exception {
625     final int numDatanodes = 6;
626     try {
627       LOG.info("Starting test testRecommission");
628 
629       startCluster(1, numDatanodes, conf);
630 
631       final Path file1 = new Path("testDecommission.dat");
632       final int replicas = numDatanodes - 1;
633 
634       ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
635       final FileSystem fileSys = cluster.getFileSystem();
636 
637       // Write a file to n-1 datanodes
638       writeFile(fileSys, file1, replicas);
639 
640       // Decommission one of the datanodes with a replica
641       BlockLocation loc = fileSys.getFileBlockLocations(file1, 0, 1)[0];
642       assertEquals("Unexpected number of replicas from getFileBlockLocations",
643           replicas, loc.getHosts().length);
644       final String toDecomHost = loc.getNames()[0];
645       String toDecomUuid = null;
646       for (DataNode d : cluster.getDataNodes()) {
647         if (d.getDatanodeId().getXferAddr().equals(toDecomHost)) {
648           toDecomUuid = d.getDatanodeId().getDatanodeUuid();
649           break;
650         }
651       }
652       assertNotNull("Could not find a dn with the block!", toDecomUuid);
653       final DatanodeInfo decomNode =
654           decommissionNode(0, toDecomUuid, decommissionedNodes,
655               AdminStates.DECOMMISSIONED);
656       decommissionedNodes.add(decomNode);
657       final BlockManager blockManager =
658           cluster.getNamesystem().getBlockManager();
659       final DatanodeManager datanodeManager =
660           blockManager.getDatanodeManager();
661       BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
662 
663       // Ensure decommissioned datanode is not automatically shutdown
664       DFSClient client = getDfsClient(cluster.getNameNode(), conf);
665       assertEquals("All datanodes must be alive", numDatanodes,
666           client.datanodeReport(DatanodeReportType.LIVE).length);
667 
668       // wait for the block to be replicated
669       final ExtendedBlock b = DFSTestUtil.getFirstBlock(fileSys, file1);
670       final String uuid = toDecomUuid;
671       GenericTestUtils.waitFor(new Supplier<Boolean>() {
672         @Override
673         public Boolean get() {
674           BlockInfoContiguous info =
675               blockManager.getStoredBlock(b.getLocalBlock());
676           int count = 0;
677           StringBuilder sb = new StringBuilder("Replica locations: ");
678           for (int i = 0; i < info.numNodes(); i++) {
679             DatanodeDescriptor dn = info.getDatanode(i);
680             sb.append(dn + ", ");
681             if (!dn.getDatanodeUuid().equals(uuid)) {
682               count++;
683             }
684           }
685           LOG.info(sb.toString());
686           LOG.info("Count: " + count);
687           return count == replicas;
688         }
689       }, 500, 30000);
690 
691       // redecommission and wait for over-replication to be fixed
692       recommissionNode(0, decomNode);
693       BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
694       DFSTestUtil.waitForReplication(cluster, b, 1, replicas, 0);
695 
696       cleanupFile(fileSys, file1);
697     } finally {
698       if (cluster != null) {
699         cluster.shutdown();
700       }
701     }
702   }
703 
704   /**
705    * Tests cluster storage statistics during decommissioning for non
706    * federated cluster
707    */
708   @Test(timeout=360000)
testClusterStats()709   public void testClusterStats() throws Exception {
710     testClusterStats(1);
711   }
712 
713   /**
714    * Tests cluster storage statistics during decommissioning for
715    * federated cluster
716    */
717   @Test(timeout=360000)
testClusterStatsFederation()718   public void testClusterStatsFederation() throws Exception {
719     testClusterStats(3);
720   }
721 
testClusterStats(int numNameNodes)722   public void testClusterStats(int numNameNodes) throws IOException,
723       InterruptedException {
724     LOG.info("Starting test testClusterStats");
725     int numDatanodes = 1;
726     startCluster(numNameNodes, numDatanodes, conf);
727 
728     for (int i = 0; i < numNameNodes; i++) {
729       FileSystem fileSys = cluster.getFileSystem(i);
730       Path file = new Path("testClusterStats.dat");
731       writeFile(fileSys, file, 1);
732 
733       FSNamesystem fsn = cluster.getNamesystem(i);
734       NameNode namenode = cluster.getNameNode(i);
735 
736       DatanodeInfo decomInfo = decommissionNode(i, null, null,
737           AdminStates.DECOMMISSION_INPROGRESS);
738       DataNode decomNode = getDataNode(decomInfo);
739       // Check namenode stats for multiple datanode heartbeats
740       verifyStats(namenode, fsn, decomInfo, decomNode, true);
741 
742       // Stop decommissioning and verify stats
743       writeConfigFile(excludeFile, null);
744       refreshNodes(fsn, conf);
745       DatanodeInfo retInfo = NameNodeAdapter.getDatanode(fsn, decomInfo);
746       DataNode retNode = getDataNode(decomInfo);
747       waitNodeState(retInfo, AdminStates.NORMAL);
748       verifyStats(namenode, fsn, retInfo, retNode, false);
749     }
750   }
751 
getDataNode(DatanodeInfo decomInfo)752   private DataNode getDataNode(DatanodeInfo decomInfo) {
753     DataNode decomNode = null;
754     for (DataNode dn: cluster.getDataNodes()) {
755       if (decomInfo.equals(dn.getDatanodeId())) {
756         decomNode = dn;
757         break;
758       }
759     }
760     assertNotNull("Could not find decomNode in cluster!", decomNode);
761     return decomNode;
762   }
763 
764   /**
765    * Test host/include file functionality. Only datanodes
766    * in the include file are allowed to connect to the namenode in a non
767    * federated cluster.
768    */
769   @Test(timeout=360000)
testHostsFile()770   public void testHostsFile() throws IOException, InterruptedException {
771     // Test for a single namenode cluster
772     testHostsFile(1);
773   }
774 
775   /**
776    * Test host/include file functionality. Only datanodes
777    * in the include file are allowed to connect to the namenode in a
778    * federated cluster.
779    */
780   @Test(timeout=360000)
testHostsFileFederation()781   public void testHostsFileFederation() throws IOException, InterruptedException {
782     // Test for 3 namenode federated cluster
783     testHostsFile(3);
784   }
785 
testHostsFile(int numNameNodes)786   public void testHostsFile(int numNameNodes) throws IOException,
787       InterruptedException {
788     int numDatanodes = 1;
789     cluster = new MiniDFSCluster.Builder(conf)
790         .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(numNameNodes))
791         .numDataNodes(numDatanodes).setupHostsFile(true).build();
792     cluster.waitActive();
793 
794     // Now empty hosts file and ensure the datanode is disallowed
795     // from talking to namenode, resulting in it's shutdown.
796     ArrayList<String>list = new ArrayList<String>();
797     final String bogusIp = "127.0.30.1";
798     list.add(bogusIp);
799     writeConfigFile(hostsFile, list);
800 
801     for (int j = 0; j < numNameNodes; j++) {
802       refreshNodes(cluster.getNamesystem(j), conf);
803 
804       DFSClient client = getDfsClient(cluster.getNameNode(j), conf);
805       DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
806       for (int i = 0 ; i < 5 && info.length != 0; i++) {
807         LOG.info("Waiting for datanode to be marked dead");
808         Thread.sleep(HEARTBEAT_INTERVAL * 1000);
809         info = client.datanodeReport(DatanodeReportType.LIVE);
810       }
811       assertEquals("Number of live nodes should be 0", 0, info.length);
812 
813       // Test that bogus hostnames are considered "dead".
814       // The dead report should have an entry for the bogus entry in the hosts
815       // file.  The original datanode is excluded from the report because it
816       // is no longer in the included list.
817       info = client.datanodeReport(DatanodeReportType.DEAD);
818       assertEquals("There should be 1 dead node", 1, info.length);
819       assertEquals(bogusIp, info[0].getHostName());
820     }
821   }
822 
823   @Test(timeout=120000)
testDecommissionWithOpenfile()824   public void testDecommissionWithOpenfile() throws IOException, InterruptedException {
825     LOG.info("Starting test testDecommissionWithOpenfile");
826 
827     //At most 4 nodes will be decommissioned
828     startCluster(1, 7, conf);
829 
830     FileSystem fileSys = cluster.getFileSystem(0);
831     FSNamesystem ns = cluster.getNamesystem(0);
832 
833     String openFile = "/testDecommissionWithOpenfile.dat";
834 
835     writeFile(fileSys, new Path(openFile), (short)3);
836     // make sure the file was open for write
837     FSDataOutputStream fdos =  fileSys.append(new Path(openFile));
838 
839     LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(cluster.getNameNode(0), openFile, 0, fileSize);
840 
841     DatanodeInfo[] dnInfos4LastBlock = lbs.getLastLocatedBlock().getLocations();
842     DatanodeInfo[] dnInfos4FirstBlock = lbs.get(0).getLocations();
843 
844     ArrayList<String> nodes = new ArrayList<String>();
845     ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>();
846 
847     DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
848     for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) {
849       DatanodeInfo found = datanodeInfo;
850       for (DatanodeInfo dif: dnInfos4LastBlock) {
851         if (datanodeInfo.equals(dif)) {
852          found = null;
853         }
854       }
855       if (found != null) {
856         nodes.add(found.getXferAddr());
857         dnInfos.add(dm.getDatanode(found));
858       }
859     }
860     //decommission one of the 3 nodes which have last block
861     nodes.add(dnInfos4LastBlock[0].getXferAddr());
862     dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0]));
863 
864     writeConfigFile(excludeFile, nodes);
865     refreshNodes(ns, conf);
866     for (DatanodeInfo dn : dnInfos) {
867       waitNodeState(dn, AdminStates.DECOMMISSIONED);
868     }
869 
870     fdos.close();
871   }
872 
873   /**
874    * Tests restart of namenode while datanode hosts are added to exclude file
875    **/
876   @Test(timeout=360000)
testDecommissionWithNamenodeRestart()877   public void testDecommissionWithNamenodeRestart()throws IOException, InterruptedException {
878     LOG.info("Starting test testDecommissionWithNamenodeRestart");
879     int numNamenodes = 1;
880     int numDatanodes = 1;
881     int replicas = 1;
882     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
883         DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
884     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 5);
885 
886     startCluster(numNamenodes, numDatanodes, conf);
887     Path file1 = new Path("testDecommissionWithNamenodeRestart.dat");
888     FileSystem fileSys = cluster.getFileSystem();
889     writeFile(fileSys, file1, replicas);
890 
891     DFSClient client = getDfsClient(cluster.getNameNode(), conf);
892     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
893     DatanodeID excludedDatanodeID = info[0];
894     String excludedDatanodeName = info[0].getXferAddr();
895 
896     writeConfigFile(excludeFile, new ArrayList<String>(Arrays.asList(excludedDatanodeName)));
897 
898     //Add a new datanode to cluster
899     cluster.startDataNodes(conf, 1, true, null, null, null, null);
900     numDatanodes+=1;
901 
902     assertEquals("Number of datanodes should be 2 ", 2, cluster.getDataNodes().size());
903     //Restart the namenode
904     cluster.restartNameNode();
905     DatanodeInfo datanodeInfo = NameNodeAdapter.getDatanode(
906         cluster.getNamesystem(), excludedDatanodeID);
907     waitNodeState(datanodeInfo, AdminStates.DECOMMISSIONED);
908 
909     // Ensure decommissioned datanode is not automatically shutdown
910     assertEquals("All datanodes must be alive", numDatanodes,
911         client.datanodeReport(DatanodeReportType.LIVE).length);
912     assertTrue("Checked if block was replicated after decommission.",
913         checkFile(fileSys, file1, replicas, datanodeInfo.getXferAddr(),
914         numDatanodes) == null);
915 
916     cleanupFile(fileSys, file1);
917     // Restart the cluster and ensure recommissioned datanodes
918     // are allowed to register with the namenode
919     cluster.shutdown();
920     startCluster(numNamenodes, numDatanodes, conf);
921     cluster.shutdown();
922   }
923 
924   /**
925    * Test using a "registration name" in a host include file.
926    *
927    * Registration names are DataNode names specified in the configuration by
928    * dfs.datanode.hostname.  The DataNode will send this name to the NameNode
929    * as part of its registration.  Registration names are helpful when you
930    * want to override the normal first result of DNS resolution on the
931    * NameNode.  For example, a given datanode IP may map to two hostnames,
932    * and you may want to choose which hostname is used internally in the
933    * cluster.
934    *
935    * It is not recommended to use a registration name which is not also a
936    * valid DNS hostname for the DataNode.  See HDFS-5237 for background.
937    */
938   @Ignore
939   @Test(timeout=360000)
testIncludeByRegistrationName()940   public void testIncludeByRegistrationName() throws Exception {
941     Configuration hdfsConf = new Configuration(conf);
942     // Any IPv4 address starting with 127 functions as a "loopback" address
943     // which is connected to the current host.  So by choosing 127.0.0.100
944     // as our registration name, we have chosen a name which is also a valid
945     // way of reaching the local DataNode we're going to start.
946     // Typically, a registration name would be a hostname, but we don't want
947     // to deal with DNS in this test.
948     final String registrationName = "127.0.0.100";
949     final String nonExistentDn = "127.0.0.10";
950     hdfsConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, registrationName);
951     cluster = new MiniDFSCluster.Builder(hdfsConf)
952         .numDataNodes(1).checkDataNodeHostConfig(true)
953         .setupHostsFile(true).build();
954     cluster.waitActive();
955 
956     // Set up an includes file that doesn't have our datanode.
957     ArrayList<String> nodes = new ArrayList<String>();
958     nodes.add(nonExistentDn);
959     writeConfigFile(hostsFile,  nodes);
960     refreshNodes(cluster.getNamesystem(0), hdfsConf);
961 
962     // Wait for the DN to be marked dead.
963     LOG.info("Waiting for DN to be marked as dead.");
964     final DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
965     GenericTestUtils.waitFor(new Supplier<Boolean>() {
966       @Override
967       public Boolean get() {
968         BlockManagerTestUtil
969             .checkHeartbeat(cluster.getNamesystem().getBlockManager());
970         try {
971           DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
972           return info.length == 1;
973         } catch (IOException e) {
974           LOG.warn("Failed to check dead DNs", e);
975           return false;
976         }
977       }
978     }, 500, 5000);
979 
980     // Use a non-empty include file with our registration name.
981     // It should work.
982     int dnPort = cluster.getDataNodes().get(0).getXferPort();
983     nodes = new ArrayList<String>();
984     nodes.add(registrationName + ":" + dnPort);
985     writeConfigFile(hostsFile,  nodes);
986     refreshNodes(cluster.getNamesystem(0), hdfsConf);
987     cluster.restartDataNode(0);
988     cluster.triggerHeartbeats();
989 
990     // Wait for the DN to come back.
991     LOG.info("Waiting for DN to come back.");
992     GenericTestUtils.waitFor(new Supplier<Boolean>() {
993       @Override
994       public Boolean get() {
995         BlockManagerTestUtil
996             .checkHeartbeat(cluster.getNamesystem().getBlockManager());
997         try {
998           DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
999           if (info.length == 1) {
1000             Assert.assertFalse(info[0].isDecommissioned());
1001             Assert.assertFalse(info[0].isDecommissionInProgress());
1002             assertEquals(registrationName, info[0].getHostName());
1003             return true;
1004           }
1005         } catch (IOException e) {
1006           LOG.warn("Failed to check dead DNs", e);
1007         }
1008         return false;
1009       }
1010     }, 500, 5000);
1011   }
1012 
1013   @Test(timeout=120000)
testBlocksPerInterval()1014   public void testBlocksPerInterval() throws Exception {
1015     Configuration newConf = new Configuration(conf);
1016     org.apache.log4j.Logger.getLogger(DecommissionManager.class)
1017         .setLevel(Level.TRACE);
1018     // Turn the blocks per interval way down
1019     newConf.setInt(
1020         DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
1021         3);
1022     // Disable the normal monitor runs
1023     newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
1024         Integer.MAX_VALUE);
1025     startCluster(1, 3, newConf);
1026     final FileSystem fs = cluster.getFileSystem();
1027     final DatanodeManager datanodeManager =
1028         cluster.getNamesystem().getBlockManager().getDatanodeManager();
1029     final DecommissionManager decomManager = datanodeManager.getDecomManager();
1030 
1031     // Write a 3 block file, so each node has one block. Should scan 3 nodes.
1032     DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
1033     doDecomCheck(datanodeManager, decomManager, 3);
1034     // Write another file, should only scan two
1035     DFSTestUtil.createFile(fs, new Path("/file2"), 64, (short)3, 0xBAD1DEA);
1036     doDecomCheck(datanodeManager, decomManager, 2);
1037     // One more file, should only scan 1
1038     DFSTestUtil.createFile(fs, new Path("/file3"), 64, (short)3, 0xBAD1DEA);
1039     doDecomCheck(datanodeManager, decomManager, 1);
1040     // blocks on each DN now exceeds limit, still scan at least one node
1041     DFSTestUtil.createFile(fs, new Path("/file4"), 64, (short)3, 0xBAD1DEA);
1042     doDecomCheck(datanodeManager, decomManager, 1);
1043   }
1044 
1045   @Deprecated
1046   @Test(timeout=120000)
testNodesPerInterval()1047   public void testNodesPerInterval() throws Exception {
1048     Configuration newConf = new Configuration(conf);
1049     org.apache.log4j.Logger.getLogger(DecommissionManager.class)
1050         .setLevel(Level.TRACE);
1051     // Set the deprecated configuration key which limits the # of nodes per
1052     // interval
1053     newConf.setInt("dfs.namenode.decommission.nodes.per.interval", 1);
1054     // Disable the normal monitor runs
1055     newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
1056         Integer.MAX_VALUE);
1057     startCluster(1, 3, newConf);
1058     final FileSystem fs = cluster.getFileSystem();
1059     final DatanodeManager datanodeManager =
1060         cluster.getNamesystem().getBlockManager().getDatanodeManager();
1061     final DecommissionManager decomManager = datanodeManager.getDecomManager();
1062 
1063     // Write a 3 block file, so each node has one block. Should scan 1 node
1064     // each time.
1065     DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
1066     for (int i=0; i<3; i++) {
1067       doDecomCheck(datanodeManager, decomManager, 1);
1068     }
1069   }
1070 
doDecomCheck(DatanodeManager datanodeManager, DecommissionManager decomManager, int expectedNumCheckedNodes)1071   private void doDecomCheck(DatanodeManager datanodeManager,
1072       DecommissionManager decomManager, int expectedNumCheckedNodes)
1073       throws IOException, ExecutionException, InterruptedException {
1074     // Decom all nodes
1075     ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
1076     for (DataNode d: cluster.getDataNodes()) {
1077       DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
1078           decommissionedNodes,
1079           AdminStates.DECOMMISSION_INPROGRESS);
1080       decommissionedNodes.add(dn);
1081     }
1082     // Run decom scan and check
1083     BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
1084     assertEquals("Unexpected # of nodes checked", expectedNumCheckedNodes,
1085         decomManager.getNumNodesChecked());
1086     // Recommission all nodes
1087     for (DatanodeInfo dn : decommissionedNodes) {
1088       recommissionNode(0, dn);
1089     }
1090   }
1091 
1092   @Test(timeout=120000)
testPendingNodes()1093   public void testPendingNodes() throws Exception {
1094     Configuration newConf = new Configuration(conf);
1095     org.apache.log4j.Logger.getLogger(DecommissionManager.class)
1096         .setLevel(Level.TRACE);
1097     // Only allow one node to be decom'd at a time
1098     newConf.setInt(
1099         DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
1100         1);
1101     // Disable the normal monitor runs
1102     newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
1103         Integer.MAX_VALUE);
1104     startCluster(1, 3, newConf);
1105     final FileSystem fs = cluster.getFileSystem();
1106     final DatanodeManager datanodeManager =
1107         cluster.getNamesystem().getBlockManager().getDatanodeManager();
1108     final DecommissionManager decomManager = datanodeManager.getDecomManager();
1109 
1110     // Keep a file open to prevent decom from progressing
1111     HdfsDataOutputStream open1 =
1112         (HdfsDataOutputStream) fs.create(new Path("/openFile1"), (short)3);
1113     // Flush and trigger block reports so the block definitely shows up on NN
1114     open1.write(123);
1115     open1.hflush();
1116     for (DataNode d: cluster.getDataNodes()) {
1117       DataNodeTestUtils.triggerBlockReport(d);
1118     }
1119     // Decom two nodes, so one is still alive
1120     ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
1121     for (int i=0; i<2; i++) {
1122       final DataNode d = cluster.getDataNodes().get(i);
1123       DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
1124           decommissionedNodes,
1125           AdminStates.DECOMMISSION_INPROGRESS);
1126       decommissionedNodes.add(dn);
1127     }
1128 
1129     for (int i=2; i>=0; i--) {
1130       assertTrackedAndPending(decomManager, 0, i);
1131       BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
1132     }
1133 
1134     // Close file, try to decom the last node, should get stuck in tracked
1135     open1.close();
1136     final DataNode d = cluster.getDataNodes().get(2);
1137     DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
1138         decommissionedNodes,
1139         AdminStates.DECOMMISSION_INPROGRESS);
1140     decommissionedNodes.add(dn);
1141     BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
1142 
1143     assertTrackedAndPending(decomManager, 1, 0);
1144   }
1145 
assertTrackedAndPending(DecommissionManager decomManager, int tracked, int pending)1146   private void assertTrackedAndPending(DecommissionManager decomManager,
1147       int tracked, int pending) {
1148     assertEquals("Unexpected number of tracked nodes", tracked,
1149         decomManager.getNumTrackedNodes());
1150     assertEquals("Unexpected number of pending nodes", pending,
1151         decomManager.getNumPendingNodes());
1152   }
1153 }
1154