1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.datanode;
19 
20 import static org.hamcrest.core.Is.is;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertThat;
23 import static org.junit.Assert.assertTrue;
24 
25 import java.io.File;
26 import java.io.FilenameFilter;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Random;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35 
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.FSDataOutputStream;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.fs.StorageType;
42 import org.apache.hadoop.hdfs.AppendTestUtil;
43 import org.apache.hadoop.hdfs.DFSConfigKeys;
44 import org.apache.hadoop.hdfs.DFSTestUtil;
45 import org.apache.hadoop.hdfs.DistributedFileSystem;
46 import org.apache.hadoop.hdfs.MiniDFSCluster;
47 import org.apache.hadoop.hdfs.protocol.Block;
48 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
49 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
50 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
51 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
52 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
53 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
54 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
55 import org.apache.hadoop.hdfs.server.namenode.NameNode;
56 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
57 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
58 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
59 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
60 import org.apache.hadoop.io.IOUtils;
61 import org.apache.hadoop.test.GenericTestUtils;
62 import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
63 import org.apache.hadoop.util.Time;
64 import org.apache.log4j.Level;
65 import org.junit.After;
66 import org.junit.Assert;
67 import org.junit.Before;
68 import org.junit.Test;
69 import org.mockito.Mockito;
70 import org.mockito.invocation.InvocationOnMock;
71 
72 /**
73  * This is the base class for simulating a variety of situations
74  * when blocks are being intentionally corrupted, unexpectedly modified,
75  * and so on before a block report is happening.
76  *
77  * By overriding {@link #sendBlockReports}, derived classes can test
78  * different variations of how block reports are split across storages
79  * and messages.
80  */
81 public abstract class BlockReportTestBase {
82   public static final Log LOG = LogFactory.getLog(BlockReportTestBase.class);
83 
84   private static short REPL_FACTOR = 1;
85   private static final int RAND_LIMIT = 2000;
86   private static final long DN_RESCAN_INTERVAL = 1;
87   private static final long DN_RESCAN_EXTRA_WAIT = 3 * DN_RESCAN_INTERVAL;
88   private static final int DN_N0 = 0;
89   private static final int FILE_START = 0;
90 
91   private static final int BLOCK_SIZE = 1024;
92   private static final int NUM_BLOCKS = 10;
93   private static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
94 
95   protected MiniDFSCluster cluster;
96   private DistributedFileSystem fs;
97 
98   private static final Random rand = new Random(RAND_LIMIT);
99 
100   private static Configuration conf;
101 
102   static {
initLoggers()103     initLoggers();
resetConfiguration()104     resetConfiguration();
105   }
106 
107   @Before
startUpCluster()108   public void startUpCluster() throws IOException {
109     REPL_FACTOR = 1; //Reset if case a test has modified the value
110     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
111     fs = cluster.getFileSystem();
112   }
113 
114   @After
shutDownCluster()115   public void shutDownCluster() throws IOException {
116     fs.close();
117     cluster.shutdownDataNodes();
118     cluster.shutdown();
119   }
120 
resetConfiguration()121   protected static void resetConfiguration() {
122     conf = new Configuration();
123     int customPerChecksumSize = 512;
124     int customBlockSize = customPerChecksumSize * 3;
125     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
126     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
127     conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DN_RESCAN_INTERVAL);
128   }
129 
130   // Generate a block report, optionally corrupting the generation
131   // stamp and/or length of one block.
getBlockReports( DataNode dn, String bpid, boolean corruptOneBlockGs, boolean corruptOneBlockLen)132   private static StorageBlockReport[] getBlockReports(
133       DataNode dn, String bpid, boolean corruptOneBlockGs,
134       boolean corruptOneBlockLen) {
135     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
136         dn.getFSDataset().getBlockReports(bpid);
137 
138     // Send block report
139     StorageBlockReport[] reports =
140         new StorageBlockReport[perVolumeBlockLists.size()];
141     boolean corruptedGs = false;
142     boolean corruptedLen = false;
143 
144     int reportIndex = 0;
145     for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
146       DatanodeStorage dnStorage = kvPair.getKey();
147       BlockListAsLongs blockList = kvPair.getValue();
148 
149       // Walk the list of blocks until we find one each to corrupt the
150       // generation stamp and length, if so requested.
151       BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
152       for (BlockReportReplica block : blockList) {
153         if (corruptOneBlockGs && !corruptedGs) {
154           long gsOld = block.getGenerationStamp();
155           long gsNew;
156           do {
157             gsNew = rand.nextInt();
158           } while (gsNew == gsOld);
159           block.setGenerationStamp(gsNew);
160           LOG.info("Corrupted the GS for block ID " + block);
161           corruptedGs = true;
162         } else if (corruptOneBlockLen && !corruptedLen) {
163           long lenOld = block.getNumBytes();
164           long lenNew;
165           do {
166             lenNew = rand.nextInt((int)lenOld - 1);
167           } while (lenNew == lenOld);
168           block.setNumBytes(lenNew);
169           LOG.info("Corrupted the length for block ID " + block);
170           corruptedLen = true;
171         }
172         builder.add(new BlockReportReplica(block));
173       }
174 
175       reports[reportIndex++] =
176           new StorageBlockReport(dnStorage, builder.build());
177     }
178 
179     return reports;
180   }
181 
182   /**
183    * Utility routine to send block reports to the NN, either in a single call
184    * or reporting one storage per call.
185    *
186    * @throws IOException
187    */
sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports)188   protected abstract void sendBlockReports(DatanodeRegistration dnR, String poolId,
189       StorageBlockReport[] reports) throws IOException;
190 
191   /**
192    * Test write a file, verifies and closes it. Then the length of the blocks
193    * are messed up and BlockReport is forced.
194    * The modification of blocks' length has to be ignored
195    *
196    * @throws java.io.IOException on an error
197    */
198   @Test(timeout=300000)
blockReport_01()199   public void blockReport_01() throws IOException {
200     final String METHOD_NAME = GenericTestUtils.getMethodName();
201     Path filePath = new Path("/" + METHOD_NAME + ".dat");
202 
203     ArrayList<Block> blocks = prepareForRide(filePath, METHOD_NAME, FILE_SIZE);
204 
205     if(LOG.isDebugEnabled()) {
206       LOG.debug("Number of blocks allocated " + blocks.size());
207     }
208     long[] oldLengths = new long[blocks.size()];
209     int tempLen;
210     for (int i = 0; i < blocks.size(); i++) {
211       Block b = blocks.get(i);
212       if(LOG.isDebugEnabled()) {
213         LOG.debug("Block " + b.getBlockName() + " before\t" + "Size " +
214             b.getNumBytes());
215       }
216       oldLengths[i] = b.getNumBytes();
217       if(LOG.isDebugEnabled()) {
218         LOG.debug("Setting new length");
219       }
220       tempLen = rand.nextInt(BLOCK_SIZE);
221       b.set(b.getBlockId(), tempLen, b.getGenerationStamp());
222       if(LOG.isDebugEnabled()) {
223         LOG.debug("Block " + b.getBlockName() + " after\t " + "Size " +
224             b.getNumBytes());
225       }
226     }
227     // all blocks belong to the same file, hence same BP
228     DataNode dn = cluster.getDataNodes().get(DN_N0);
229     String poolId = cluster.getNamesystem().getBlockPoolId();
230     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
231     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
232     sendBlockReports(dnR, poolId, reports);
233 
234     List<LocatedBlock> blocksAfterReport =
235       DFSTestUtil.getAllBlocks(fs.open(filePath));
236 
237     if(LOG.isDebugEnabled()) {
238       LOG.debug("After mods: Number of blocks allocated " +
239           blocksAfterReport.size());
240     }
241 
242     for (int i = 0; i < blocksAfterReport.size(); i++) {
243       ExtendedBlock b = blocksAfterReport.get(i).getBlock();
244       assertEquals("Length of " + i + "th block is incorrect",
245         oldLengths[i], b.getNumBytes());
246     }
247   }
248 
249   /**
250    * Test write a file, verifies and closes it. Then a couple of random blocks
251    * is removed and BlockReport is forced; the FSNamesystem is pushed to
252    * recalculate required DN's activities such as replications and so on.
253    * The number of missing and under-replicated blocks should be the same in
254    * case of a single-DN cluster.
255    *
256    * @throws IOException in case of errors
257    */
258   @Test(timeout=300000)
blockReport_02()259   public void blockReport_02() throws IOException {
260     final String METHOD_NAME = GenericTestUtils.getMethodName();
261     LOG.info("Running test " + METHOD_NAME);
262 
263     Path filePath = new Path("/" + METHOD_NAME + ".dat");
264     DFSTestUtil.createFile(fs, filePath,
265       FILE_SIZE, REPL_FACTOR, rand.nextLong());
266 
267     // mock around with newly created blocks and delete some
268     File dataDir = new File(cluster.getDataDirectory());
269     assertTrue(dataDir.isDirectory());
270 
271     List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>();
272     List<Integer> removedIndex = new ArrayList<Integer>();
273     List<LocatedBlock> lBlocks =
274       cluster.getNameNodeRpc().getBlockLocations(
275           filePath.toString(), FILE_START,
276           FILE_SIZE).getLocatedBlocks();
277 
278     while (removedIndex.size() != 2) {
279       int newRemoveIndex = rand.nextInt(lBlocks.size());
280       if (!removedIndex.contains(newRemoveIndex))
281         removedIndex.add(newRemoveIndex);
282     }
283 
284     for (Integer aRemovedIndex : removedIndex) {
285       blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock());
286     }
287 
288     if(LOG.isDebugEnabled()) {
289       LOG.debug("Number of blocks allocated " + lBlocks.size());
290     }
291 
292     final DataNode dn0 = cluster.getDataNodes().get(DN_N0);
293     for (ExtendedBlock b : blocks2Remove) {
294       if(LOG.isDebugEnabled()) {
295         LOG.debug("Removing the block " + b.getBlockName());
296       }
297       for (File f : findAllFiles(dataDir,
298         new MyFileFilter(b.getBlockName(), true))) {
299         DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b);
300         if (!f.delete()) {
301           LOG.warn("Couldn't delete " + b.getBlockName());
302         } else {
303           LOG.debug("Deleted file " + f.toString());
304         }
305       }
306     }
307 
308     waitTil(TimeUnit.SECONDS.toMillis(DN_RESCAN_EXTRA_WAIT));
309 
310     // all blocks belong to the same file, hence same BP
311     String poolId = cluster.getNamesystem().getBlockPoolId();
312     DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
313     StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
314     sendBlockReports(dnR, poolId, reports);
315 
316     BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
317         .getBlockManager());
318 
319     printStats();
320 
321     assertEquals("Wrong number of MissingBlocks is found",
322       blocks2Remove.size(), cluster.getNamesystem().getMissingBlocksCount());
323     assertEquals("Wrong number of UnderReplicatedBlocks is found",
324       blocks2Remove.size(), cluster.getNamesystem().getUnderReplicatedBlocks());
325   }
326 
327 
328   /**
329    * Test writes a file and closes it.
330    * Block reported is generated with a bad GS for a single block.
331    * Block report is forced and the check for # of corrupted blocks is performed.
332    *
333    * @throws IOException in case of an error
334    */
335   @Test(timeout=300000)
blockReport_03()336   public void blockReport_03() throws IOException {
337     final String METHOD_NAME = GenericTestUtils.getMethodName();
338     Path filePath = new Path("/" + METHOD_NAME + ".dat");
339     writeFile(METHOD_NAME, FILE_SIZE, filePath);
340 
341     // all blocks belong to the same file, hence same BP
342     DataNode dn = cluster.getDataNodes().get(DN_N0);
343     String poolId = cluster.getNamesystem().getBlockPoolId();
344     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
345     StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
346     sendBlockReports(dnR, poolId, reports);
347     printStats();
348 
349     assertThat("Wrong number of corrupt blocks",
350                cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
351     assertThat("Wrong number of PendingDeletion blocks",
352                cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
353   }
354 
355   /**
356    * Test writes a file and closes it.
357    * Block reported is generated with an extra block.
358    * Block report is forced and the check for # of pendingdeletion
359    * blocks is performed.
360    *
361    * @throws IOException in case of an error
362    */
363   @Test(timeout=300000)
blockReport_04()364   public void blockReport_04() throws IOException {
365     final String METHOD_NAME = GenericTestUtils.getMethodName();
366     Path filePath = new Path("/" + METHOD_NAME + ".dat");
367     DFSTestUtil.createFile(fs, filePath,
368                            FILE_SIZE, REPL_FACTOR, rand.nextLong());
369 
370 
371     DataNode dn = cluster.getDataNodes().get(DN_N0);
372     // all blocks belong to the same file, hence same BP
373     String poolId = cluster.getNamesystem().getBlockPoolId();
374 
375     // Create a bogus new block which will not be present on the namenode.
376     ExtendedBlock b = new ExtendedBlock(
377         poolId, rand.nextLong(), 1024L, rand.nextLong());
378     dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
379 
380     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
381     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
382     sendBlockReports(dnR, poolId, reports);
383     printStats();
384 
385     assertThat("Wrong number of corrupt blocks",
386                cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L));
387     assertThat("Wrong number of PendingDeletion blocks",
388                cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
389   }
390 
391   /**
392    * Test creates a file and closes it.
393    * The second datanode is started in the cluster.
394    * As soon as the replication process is completed test runs
395    * Block report and checks that no underreplicated blocks are left
396    *
397    * @throws IOException in case of an error
398    */
399   @Test(timeout=300000)
blockReport_06()400   public void blockReport_06() throws Exception {
401     final String METHOD_NAME = GenericTestUtils.getMethodName();
402     Path filePath = new Path("/" + METHOD_NAME + ".dat");
403     final int DN_N1 = DN_N0 + 1;
404 
405     writeFile(METHOD_NAME, FILE_SIZE, filePath);
406     startDNandWait(filePath, true);
407 
408     // all blocks belong to the same file, hence same BP
409     DataNode dn = cluster.getDataNodes().get(DN_N1);
410     String poolId = cluster.getNamesystem().getBlockPoolId();
411     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
412     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
413     sendBlockReports(dnR, poolId, reports);
414     printStats();
415     assertEquals("Wrong number of PendingReplication Blocks",
416       0, cluster.getNamesystem().getUnderReplicatedBlocks());
417   }
418 
419   /**
420    * Similar to BlockReport_03() but works with two DNs
421    * Test writes a file and closes it.
422    * The second datanode is started in the cluster.
423    * As soon as the replication process is completed test finds a block from
424    * the second DN and sets its GS to be < of original one.
425    * this is the markBlockAsCorrupt case 3 so we expect one pending deletion
426    * Block report is forced and the check for # of currupted blocks is performed.
427    * Another block is chosen and its length is set to a lesser than original.
428    * A check for another corrupted block is performed after yet another
429    * BlockReport
430    *
431    * @throws IOException in case of an error
432    */
433   @Test(timeout=300000)
blockReport_07()434   public void blockReport_07() throws Exception {
435     final String METHOD_NAME = GenericTestUtils.getMethodName();
436     Path filePath = new Path("/" + METHOD_NAME + ".dat");
437     final int DN_N1 = DN_N0 + 1;
438 
439     // write file and start second node to be "older" than the original
440     writeFile(METHOD_NAME, FILE_SIZE, filePath);
441     startDNandWait(filePath, true);
442 
443     // all blocks belong to the same file, hence same BP
444     DataNode dn = cluster.getDataNodes().get(DN_N1);
445     String poolId = cluster.getNamesystem().getBlockPoolId();
446     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
447     StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
448     sendBlockReports(dnR, poolId, reports);
449     printStats();
450 
451     assertThat("Wrong number of corrupt blocks",
452                cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L));
453     assertThat("Wrong number of PendingDeletion blocks",
454                cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
455     assertThat("Wrong number of PendingReplication blocks",
456                cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
457 
458     reports = getBlockReports(dn, poolId, false, true);
459     sendBlockReports(dnR, poolId, reports);
460     printStats();
461 
462     assertThat("Wrong number of corrupt blocks",
463                cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
464     assertThat("Wrong number of PendingDeletion blocks",
465                cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
466     assertThat("Wrong number of PendingReplication blocks",
467                cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
468 
469     printStats();
470 
471   }
472 
473   /**
474    * The test set the configuration parameters for a large block size and
475    * restarts initiated single-node cluster.
476    * Then it writes a file > block_size and closes it.
477    * The second datanode is started in the cluster.
478    * As soon as the replication process is started and at least one TEMPORARY
479    * replica is found test forces BlockReport process and checks
480    * if the TEMPORARY replica isn't reported on it.
481    * Eventually, the configuration is being restored into the original state.
482    *
483    * @throws IOException in case of an error
484    */
485   @Test(timeout=300000)
blockReport_08()486   public void blockReport_08() throws IOException {
487     final String METHOD_NAME = GenericTestUtils.getMethodName();
488     Path filePath = new Path("/" + METHOD_NAME + ".dat");
489     final int DN_N1 = DN_N0 + 1;
490     final int bytesChkSum = 1024 * 1000;
491 
492     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum);
493     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum);
494     shutDownCluster();
495     startUpCluster();
496 
497     try {
498       ArrayList<Block> blocks =
499         writeFile(METHOD_NAME, 12 * bytesChkSum, filePath);
500       Block bl = findBlock(filePath, 12 * bytesChkSum);
501       BlockChecker bc = new BlockChecker(filePath);
502       bc.start();
503 
504       waitForTempReplica(bl, DN_N1);
505 
506       // all blocks belong to the same file, hence same BP
507       DataNode dn = cluster.getDataNodes().get(DN_N1);
508       String poolId = cluster.getNamesystem().getBlockPoolId();
509       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
510       StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
511       sendBlockReports(dnR, poolId, reports);
512       printStats();
513       assertEquals("Wrong number of PendingReplication blocks",
514         blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
515 
516       try {
517         bc.join();
518       } catch (InterruptedException e) { }
519     } finally {
520       resetConfiguration(); // return the initial state of the configuration
521     }
522   }
523 
524   // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's
525   // replica block. Expect the same behaviour: NN should simply ignore this
526   // block
527   @Test(timeout=300000)
blockReport_09()528   public void blockReport_09() throws IOException {
529     final String METHOD_NAME = GenericTestUtils.getMethodName();
530     Path filePath = new Path("/" + METHOD_NAME + ".dat");
531     final int DN_N1 = DN_N0 + 1;
532     final int bytesChkSum = 1024 * 1000;
533 
534     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum);
535     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum);
536     shutDownCluster();
537     startUpCluster();
538     // write file and start second node to be "older" than the original
539 
540     try {
541       writeFile(METHOD_NAME, 12 * bytesChkSum, filePath);
542 
543       Block bl = findBlock(filePath, 12 * bytesChkSum);
544       BlockChecker bc = new BlockChecker(filePath);
545       bc.start();
546 
547       waitForTempReplica(bl, DN_N1);
548 
549       // all blocks belong to the same file, hence same BP
550       DataNode dn = cluster.getDataNodes().get(DN_N1);
551       String poolId = cluster.getNamesystem().getBlockPoolId();
552       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
553       StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true);
554       sendBlockReports(dnR, poolId, reports);
555       printStats();
556       assertEquals("Wrong number of PendingReplication blocks",
557         2, cluster.getNamesystem().getPendingReplicationBlocks());
558 
559       try {
560         bc.join();
561       } catch (InterruptedException e) {}
562     } finally {
563       resetConfiguration(); // return the initial state of the configuration
564     }
565   }
566 
567   /**
568    * Test for the case where one of the DNs in the pipeline is in the
569    * process of doing a block report exactly when the block is closed.
570    * In this case, the block report becomes delayed until after the
571    * block is marked completed on the NN, and hence it reports an RBW
572    * replica for a COMPLETE block. Such a report should not be marked
573    * corrupt.
574    * This is a regression test for HDFS-2791.
575    */
576   @Test(timeout=300000)
testOneReplicaRbwReportArrivesAfterBlockCompleted()577   public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
578     final CountDownLatch brFinished = new CountDownLatch(1);
579     DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
580       @Override
581       protected Object passThrough(InvocationOnMock invocation)
582           throws Throwable {
583         try {
584           return super.passThrough(invocation);
585         } finally {
586           // inform the test that our block report went through.
587           brFinished.countDown();
588         }
589       }
590     };
591 
592     final String METHOD_NAME = GenericTestUtils.getMethodName();
593     Path filePath = new Path("/" + METHOD_NAME + ".dat");
594 
595     // Start a second DN for this test -- we're checking
596     // what happens when one of the DNs is slowed for some reason.
597     REPL_FACTOR = 2;
598     startDNandWait(null, false);
599 
600     NameNode nn = cluster.getNameNode();
601 
602     FSDataOutputStream out = fs.create(filePath, REPL_FACTOR);
603     try {
604       AppendTestUtil.write(out, 0, 10);
605       out.hflush();
606 
607       // Set up a spy so that we can delay the block report coming
608       // from this node.
609       DataNode dn = cluster.getDataNodes().get(0);
610       DatanodeProtocolClientSideTranslatorPB spy =
611         DataNodeTestUtils.spyOnBposToNN(dn, nn);
612 
613       Mockito.doAnswer(delayer)
614         .when(spy).blockReport(
615           Mockito.<DatanodeRegistration>anyObject(),
616           Mockito.anyString(),
617           Mockito.<StorageBlockReport[]>anyObject(),
618           Mockito.<BlockReportContext>anyObject());
619 
620       // Force a block report to be generated. The block report will have
621       // an RBW replica in it. Wait for the RPC to be sent, but block
622       // it before it gets to the NN.
623       dn.scheduleAllBlockReport(0);
624       delayer.waitForCall();
625 
626     } finally {
627       IOUtils.closeStream(out);
628     }
629 
630     // Now that the stream is closed, the NN will have the block in COMPLETE
631     // state.
632     delayer.proceed();
633     brFinished.await();
634 
635     // Verify that no replicas are marked corrupt, and that the
636     // file is still readable.
637     BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
638     assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks());
639     DFSTestUtil.readFile(fs, filePath);
640 
641     // Ensure that the file is readable even from the DN that we futzed with.
642     cluster.stopDataNode(1);
643     DFSTestUtil.readFile(fs, filePath);
644   }
645 
waitForTempReplica(Block bl, int DN_N1)646   private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
647     final boolean tooLongWait = false;
648     final int TIMEOUT = 40000;
649 
650     if(LOG.isDebugEnabled()) {
651       LOG.debug("Wait for datanode " + DN_N1 + " to appear");
652     }
653     while (cluster.getDataNodes().size() <= DN_N1) {
654       waitTil(20);
655     }
656     if(LOG.isDebugEnabled()) {
657       LOG.debug("Total number of DNs " + cluster.getDataNodes().size());
658     }
659     cluster.waitActive();
660 
661     // Look about specified DN for the replica of the block from 1st DN
662     final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
663     String bpid = cluster.getNamesystem().getBlockPoolId();
664     Replica r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
665     long start = Time.monotonicNow();
666     int count = 0;
667     while (r == null) {
668       waitTil(5);
669       r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
670       long waiting_period = Time.monotonicNow() - start;
671       if (count++ % 100 == 0)
672         if(LOG.isDebugEnabled()) {
673           LOG.debug("Has been waiting for " + waiting_period + " ms.");
674         }
675       if (waiting_period > TIMEOUT)
676         assertTrue("Was waiting too long to get ReplicaInfo from a datanode",
677           tooLongWait);
678     }
679 
680     HdfsServerConstants.ReplicaState state = r.getState();
681     if(LOG.isDebugEnabled()) {
682       LOG.debug("Replica state before the loop " + state.getValue());
683     }
684     start = Time.monotonicNow();
685     while (state != HdfsServerConstants.ReplicaState.TEMPORARY) {
686       waitTil(5);
687       state = r.getState();
688       if(LOG.isDebugEnabled()) {
689         LOG.debug("Keep waiting for " + bl.getBlockName() +
690             " is in state " + state.getValue());
691       }
692       if (Time.monotonicNow() - start > TIMEOUT)
693         assertTrue("Was waiting too long for a replica to become TEMPORARY",
694           tooLongWait);
695     }
696     if(LOG.isDebugEnabled()) {
697       LOG.debug("Replica state after the loop " + state.getValue());
698     }
699   }
700 
701   // Helper methods from here below...
702   // Write file and start second data node.
writeFile(final String METHOD_NAME, final long fileSize, Path filePath)703   private ArrayList<Block> writeFile(final String METHOD_NAME,
704                                                final long fileSize,
705                                                Path filePath) {
706     ArrayList<Block> blocks = null;
707     try {
708       REPL_FACTOR = 2;
709       blocks = prepareForRide(filePath, METHOD_NAME, fileSize);
710     } catch (IOException e) {
711       if(LOG.isDebugEnabled()) {
712         LOG.debug("Caught exception ", e);
713       }
714     }
715     return blocks;
716   }
717 
startDNandWait(Path filePath, boolean waitReplicas)718   private void startDNandWait(Path filePath, boolean waitReplicas)
719       throws IOException, InterruptedException, TimeoutException {
720     if (LOG.isDebugEnabled()) {
721       LOG.debug("Before next DN start: " + cluster.getDataNodes().size());
722     }
723     cluster.startDataNodes(conf, 1, true, null, null);
724     cluster.waitClusterUp();
725     ArrayList<DataNode> datanodes = cluster.getDataNodes();
726     assertEquals(datanodes.size(), 2);
727 
728     if (LOG.isDebugEnabled()) {
729       int lastDn = datanodes.size() - 1;
730       LOG.debug("New datanode "
731           + cluster.getDataNodes().get(lastDn).getDisplayName()
732           + " has been started");
733     }
734     if (waitReplicas) {
735       DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR);
736     }
737   }
738 
prepareForRide(final Path filePath, final String METHOD_NAME, long fileSize)739   private ArrayList<Block> prepareForRide(final Path filePath,
740                                           final String METHOD_NAME,
741                                           long fileSize) throws IOException {
742     LOG.info("Running test " + METHOD_NAME);
743 
744     DFSTestUtil.createFile(fs, filePath, fileSize,
745       REPL_FACTOR, rand.nextLong());
746 
747     return locatedToBlocks(cluster.getNameNodeRpc()
748       .getBlockLocations(filePath.toString(), FILE_START,
749         fileSize).getLocatedBlocks(), null);
750   }
751 
printStats()752   private void printStats() {
753     BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager());
754     if(LOG.isDebugEnabled()) {
755       LOG.debug("Missing " + cluster.getNamesystem().getMissingBlocksCount());
756       LOG.debug("Corrupted " + cluster.getNamesystem().getCorruptReplicaBlocks());
757       LOG.debug("Under-replicated " + cluster.getNamesystem().
758           getUnderReplicatedBlocks());
759       LOG.debug("Pending delete " + cluster.getNamesystem().
760           getPendingDeletionBlocks());
761       LOG.debug("Pending replications " + cluster.getNamesystem().
762           getPendingReplicationBlocks());
763       LOG.debug("Excess " + cluster.getNamesystem().getExcessBlocks());
764       LOG.debug("Total " + cluster.getNamesystem().getBlocksTotal());
765     }
766   }
767 
locatedToBlocks(final List<LocatedBlock> locatedBlks, List<Integer> positionsToRemove)768   private ArrayList<Block> locatedToBlocks(final List<LocatedBlock> locatedBlks,
769                                            List<Integer> positionsToRemove) {
770     ArrayList<Block> newList = new ArrayList<Block>();
771     for (int i = 0; i < locatedBlks.size(); i++) {
772       if (positionsToRemove != null && positionsToRemove.contains(i)) {
773         if(LOG.isDebugEnabled()) {
774           LOG.debug(i + " block to be omitted");
775         }
776         continue;
777       }
778       newList.add(new Block(locatedBlks.get(i).getBlock().getLocalBlock()));
779     }
780     return newList;
781   }
782 
waitTil(long waitPeriod)783   private void waitTil(long waitPeriod) {
784     try { //Wait til next re-scan
785       Thread.sleep(waitPeriod);
786     } catch (InterruptedException e) {
787       e.printStackTrace();
788     }
789   }
790 
findAllFiles(File top, FilenameFilter mask)791   private List<File> findAllFiles(File top, FilenameFilter mask) {
792     if (top == null) return null;
793     ArrayList<File> ret = new ArrayList<File>();
794     for (File f : top.listFiles()) {
795       if (f.isDirectory())
796         ret.addAll(findAllFiles(f, mask));
797       else if (mask.accept(f, f.getName()))
798         ret.add(f);
799     }
800     return ret;
801   }
802 
803   private class MyFileFilter implements FilenameFilter {
804     private String nameToAccept = "";
805     private boolean all = false;
806 
MyFileFilter(String nameToAccept, boolean all)807     public MyFileFilter(String nameToAccept, boolean all) {
808       if (nameToAccept == null)
809         throw new IllegalArgumentException("Argument isn't suppose to be null");
810       this.nameToAccept = nameToAccept;
811       this.all = all;
812     }
813 
814     @Override
accept(File file, String s)815     public boolean accept(File file, String s) {
816       if (all)
817         return s != null && s.startsWith(nameToAccept);
818       else
819         return s != null && s.equals(nameToAccept);
820     }
821   }
822 
initLoggers()823   private static void initLoggers() {
824     DFSTestUtil.setNameNodeLogLevel(Level.ALL);
825     GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
826     GenericTestUtils.setLogLevel(BlockReportTestBase.LOG, Level.ALL);
827   }
828 
findBlock(Path path, long size)829   private Block findBlock(Path path, long size) throws IOException {
830     Block ret;
831       List<LocatedBlock> lbs =
832         cluster.getNameNodeRpc()
833         .getBlockLocations(path.toString(),
834           FILE_START, size).getLocatedBlocks();
835       LocatedBlock lb = lbs.get(lbs.size() - 1);
836 
837       // Get block from the first DN
838       ret = cluster.getDataNodes().get(DN_N0).
839         data.getStoredBlock(lb.getBlock()
840         .getBlockPoolId(), lb.getBlock().getBlockId());
841     return ret;
842   }
843 
844   private class BlockChecker extends Thread {
845     final Path filePath;
846 
BlockChecker(final Path filePath)847     public BlockChecker(final Path filePath) {
848       this.filePath = filePath;
849     }
850 
851     @Override
run()852     public void run() {
853       try {
854         startDNandWait(filePath, true);
855       } catch (Exception e) {
856         e.printStackTrace();
857         Assert.fail("Failed to start BlockChecker: " + e);
858       }
859     }
860   }
861 }
862