1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.datanode; 19 20 import static org.hamcrest.core.Is.is; 21 import static org.junit.Assert.assertEquals; 22 import static org.junit.Assert.assertThat; 23 import static org.junit.Assert.assertTrue; 24 25 import java.io.File; 26 import java.io.FilenameFilter; 27 import java.io.IOException; 28 import java.util.ArrayList; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.Random; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.TimeUnit; 34 import java.util.concurrent.TimeoutException; 35 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 import org.apache.hadoop.conf.Configuration; 39 import org.apache.hadoop.fs.FSDataOutputStream; 40 import org.apache.hadoop.fs.Path; 41 import org.apache.hadoop.fs.StorageType; 42 import org.apache.hadoop.hdfs.AppendTestUtil; 43 import org.apache.hadoop.hdfs.DFSConfigKeys; 44 import org.apache.hadoop.hdfs.DFSTestUtil; 45 import org.apache.hadoop.hdfs.DistributedFileSystem; 46 import org.apache.hadoop.hdfs.MiniDFSCluster; 47 import org.apache.hadoop.hdfs.protocol.Block; 48 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; 49 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; 50 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 51 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 52 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; 53 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; 54 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; 55 import org.apache.hadoop.hdfs.server.namenode.NameNode; 56 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; 57 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; 58 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; 59 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; 60 import org.apache.hadoop.io.IOUtils; 61 import org.apache.hadoop.test.GenericTestUtils; 62 import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; 63 import org.apache.hadoop.util.Time; 64 import org.apache.log4j.Level; 65 import org.junit.After; 66 import org.junit.Assert; 67 import org.junit.Before; 68 import org.junit.Test; 69 import org.mockito.Mockito; 70 import org.mockito.invocation.InvocationOnMock; 71 72 /** 73 * This is the base class for simulating a variety of situations 74 * when blocks are being intentionally corrupted, unexpectedly modified, 75 * and so on before a block report is happening. 76 * 77 * By overriding {@link #sendBlockReports}, derived classes can test 78 * different variations of how block reports are split across storages 79 * and messages. 80 */ 81 public abstract class BlockReportTestBase { 82 public static final Log LOG = LogFactory.getLog(BlockReportTestBase.class); 83 84 private static short REPL_FACTOR = 1; 85 private static final int RAND_LIMIT = 2000; 86 private static final long DN_RESCAN_INTERVAL = 1; 87 private static final long DN_RESCAN_EXTRA_WAIT = 3 * DN_RESCAN_INTERVAL; 88 private static final int DN_N0 = 0; 89 private static final int FILE_START = 0; 90 91 private static final int BLOCK_SIZE = 1024; 92 private static final int NUM_BLOCKS = 10; 93 private static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1; 94 95 protected MiniDFSCluster cluster; 96 private DistributedFileSystem fs; 97 98 private static final Random rand = new Random(RAND_LIMIT); 99 100 private static Configuration conf; 101 102 static { initLoggers()103 initLoggers(); resetConfiguration()104 resetConfiguration(); 105 } 106 107 @Before startUpCluster()108 public void startUpCluster() throws IOException { 109 REPL_FACTOR = 1; //Reset if case a test has modified the value 110 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build(); 111 fs = cluster.getFileSystem(); 112 } 113 114 @After shutDownCluster()115 public void shutDownCluster() throws IOException { 116 fs.close(); 117 cluster.shutdownDataNodes(); 118 cluster.shutdown(); 119 } 120 resetConfiguration()121 protected static void resetConfiguration() { 122 conf = new Configuration(); 123 int customPerChecksumSize = 512; 124 int customBlockSize = customPerChecksumSize * 3; 125 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); 126 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); 127 conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DN_RESCAN_INTERVAL); 128 } 129 130 // Generate a block report, optionally corrupting the generation 131 // stamp and/or length of one block. getBlockReports( DataNode dn, String bpid, boolean corruptOneBlockGs, boolean corruptOneBlockLen)132 private static StorageBlockReport[] getBlockReports( 133 DataNode dn, String bpid, boolean corruptOneBlockGs, 134 boolean corruptOneBlockLen) { 135 Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = 136 dn.getFSDataset().getBlockReports(bpid); 137 138 // Send block report 139 StorageBlockReport[] reports = 140 new StorageBlockReport[perVolumeBlockLists.size()]; 141 boolean corruptedGs = false; 142 boolean corruptedLen = false; 143 144 int reportIndex = 0; 145 for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { 146 DatanodeStorage dnStorage = kvPair.getKey(); 147 BlockListAsLongs blockList = kvPair.getValue(); 148 149 // Walk the list of blocks until we find one each to corrupt the 150 // generation stamp and length, if so requested. 151 BlockListAsLongs.Builder builder = BlockListAsLongs.builder(); 152 for (BlockReportReplica block : blockList) { 153 if (corruptOneBlockGs && !corruptedGs) { 154 long gsOld = block.getGenerationStamp(); 155 long gsNew; 156 do { 157 gsNew = rand.nextInt(); 158 } while (gsNew == gsOld); 159 block.setGenerationStamp(gsNew); 160 LOG.info("Corrupted the GS for block ID " + block); 161 corruptedGs = true; 162 } else if (corruptOneBlockLen && !corruptedLen) { 163 long lenOld = block.getNumBytes(); 164 long lenNew; 165 do { 166 lenNew = rand.nextInt((int)lenOld - 1); 167 } while (lenNew == lenOld); 168 block.setNumBytes(lenNew); 169 LOG.info("Corrupted the length for block ID " + block); 170 corruptedLen = true; 171 } 172 builder.add(new BlockReportReplica(block)); 173 } 174 175 reports[reportIndex++] = 176 new StorageBlockReport(dnStorage, builder.build()); 177 } 178 179 return reports; 180 } 181 182 /** 183 * Utility routine to send block reports to the NN, either in a single call 184 * or reporting one storage per call. 185 * 186 * @throws IOException 187 */ sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports)188 protected abstract void sendBlockReports(DatanodeRegistration dnR, String poolId, 189 StorageBlockReport[] reports) throws IOException; 190 191 /** 192 * Test write a file, verifies and closes it. Then the length of the blocks 193 * are messed up and BlockReport is forced. 194 * The modification of blocks' length has to be ignored 195 * 196 * @throws java.io.IOException on an error 197 */ 198 @Test(timeout=300000) blockReport_01()199 public void blockReport_01() throws IOException { 200 final String METHOD_NAME = GenericTestUtils.getMethodName(); 201 Path filePath = new Path("/" + METHOD_NAME + ".dat"); 202 203 ArrayList<Block> blocks = prepareForRide(filePath, METHOD_NAME, FILE_SIZE); 204 205 if(LOG.isDebugEnabled()) { 206 LOG.debug("Number of blocks allocated " + blocks.size()); 207 } 208 long[] oldLengths = new long[blocks.size()]; 209 int tempLen; 210 for (int i = 0; i < blocks.size(); i++) { 211 Block b = blocks.get(i); 212 if(LOG.isDebugEnabled()) { 213 LOG.debug("Block " + b.getBlockName() + " before\t" + "Size " + 214 b.getNumBytes()); 215 } 216 oldLengths[i] = b.getNumBytes(); 217 if(LOG.isDebugEnabled()) { 218 LOG.debug("Setting new length"); 219 } 220 tempLen = rand.nextInt(BLOCK_SIZE); 221 b.set(b.getBlockId(), tempLen, b.getGenerationStamp()); 222 if(LOG.isDebugEnabled()) { 223 LOG.debug("Block " + b.getBlockName() + " after\t " + "Size " + 224 b.getNumBytes()); 225 } 226 } 227 // all blocks belong to the same file, hence same BP 228 DataNode dn = cluster.getDataNodes().get(DN_N0); 229 String poolId = cluster.getNamesystem().getBlockPoolId(); 230 DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); 231 StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); 232 sendBlockReports(dnR, poolId, reports); 233 234 List<LocatedBlock> blocksAfterReport = 235 DFSTestUtil.getAllBlocks(fs.open(filePath)); 236 237 if(LOG.isDebugEnabled()) { 238 LOG.debug("After mods: Number of blocks allocated " + 239 blocksAfterReport.size()); 240 } 241 242 for (int i = 0; i < blocksAfterReport.size(); i++) { 243 ExtendedBlock b = blocksAfterReport.get(i).getBlock(); 244 assertEquals("Length of " + i + "th block is incorrect", 245 oldLengths[i], b.getNumBytes()); 246 } 247 } 248 249 /** 250 * Test write a file, verifies and closes it. Then a couple of random blocks 251 * is removed and BlockReport is forced; the FSNamesystem is pushed to 252 * recalculate required DN's activities such as replications and so on. 253 * The number of missing and under-replicated blocks should be the same in 254 * case of a single-DN cluster. 255 * 256 * @throws IOException in case of errors 257 */ 258 @Test(timeout=300000) blockReport_02()259 public void blockReport_02() throws IOException { 260 final String METHOD_NAME = GenericTestUtils.getMethodName(); 261 LOG.info("Running test " + METHOD_NAME); 262 263 Path filePath = new Path("/" + METHOD_NAME + ".dat"); 264 DFSTestUtil.createFile(fs, filePath, 265 FILE_SIZE, REPL_FACTOR, rand.nextLong()); 266 267 // mock around with newly created blocks and delete some 268 File dataDir = new File(cluster.getDataDirectory()); 269 assertTrue(dataDir.isDirectory()); 270 271 List<ExtendedBlock> blocks2Remove = new ArrayList<ExtendedBlock>(); 272 List<Integer> removedIndex = new ArrayList<Integer>(); 273 List<LocatedBlock> lBlocks = 274 cluster.getNameNodeRpc().getBlockLocations( 275 filePath.toString(), FILE_START, 276 FILE_SIZE).getLocatedBlocks(); 277 278 while (removedIndex.size() != 2) { 279 int newRemoveIndex = rand.nextInt(lBlocks.size()); 280 if (!removedIndex.contains(newRemoveIndex)) 281 removedIndex.add(newRemoveIndex); 282 } 283 284 for (Integer aRemovedIndex : removedIndex) { 285 blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock()); 286 } 287 288 if(LOG.isDebugEnabled()) { 289 LOG.debug("Number of blocks allocated " + lBlocks.size()); 290 } 291 292 final DataNode dn0 = cluster.getDataNodes().get(DN_N0); 293 for (ExtendedBlock b : blocks2Remove) { 294 if(LOG.isDebugEnabled()) { 295 LOG.debug("Removing the block " + b.getBlockName()); 296 } 297 for (File f : findAllFiles(dataDir, 298 new MyFileFilter(b.getBlockName(), true))) { 299 DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b); 300 if (!f.delete()) { 301 LOG.warn("Couldn't delete " + b.getBlockName()); 302 } else { 303 LOG.debug("Deleted file " + f.toString()); 304 } 305 } 306 } 307 308 waitTil(TimeUnit.SECONDS.toMillis(DN_RESCAN_EXTRA_WAIT)); 309 310 // all blocks belong to the same file, hence same BP 311 String poolId = cluster.getNamesystem().getBlockPoolId(); 312 DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId); 313 StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false); 314 sendBlockReports(dnR, poolId, reports); 315 316 BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() 317 .getBlockManager()); 318 319 printStats(); 320 321 assertEquals("Wrong number of MissingBlocks is found", 322 blocks2Remove.size(), cluster.getNamesystem().getMissingBlocksCount()); 323 assertEquals("Wrong number of UnderReplicatedBlocks is found", 324 blocks2Remove.size(), cluster.getNamesystem().getUnderReplicatedBlocks()); 325 } 326 327 328 /** 329 * Test writes a file and closes it. 330 * Block reported is generated with a bad GS for a single block. 331 * Block report is forced and the check for # of corrupted blocks is performed. 332 * 333 * @throws IOException in case of an error 334 */ 335 @Test(timeout=300000) blockReport_03()336 public void blockReport_03() throws IOException { 337 final String METHOD_NAME = GenericTestUtils.getMethodName(); 338 Path filePath = new Path("/" + METHOD_NAME + ".dat"); 339 writeFile(METHOD_NAME, FILE_SIZE, filePath); 340 341 // all blocks belong to the same file, hence same BP 342 DataNode dn = cluster.getDataNodes().get(DN_N0); 343 String poolId = cluster.getNamesystem().getBlockPoolId(); 344 DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); 345 StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); 346 sendBlockReports(dnR, poolId, reports); 347 printStats(); 348 349 assertThat("Wrong number of corrupt blocks", 350 cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L)); 351 assertThat("Wrong number of PendingDeletion blocks", 352 cluster.getNamesystem().getPendingDeletionBlocks(), is(0L)); 353 } 354 355 /** 356 * Test writes a file and closes it. 357 * Block reported is generated with an extra block. 358 * Block report is forced and the check for # of pendingdeletion 359 * blocks is performed. 360 * 361 * @throws IOException in case of an error 362 */ 363 @Test(timeout=300000) blockReport_04()364 public void blockReport_04() throws IOException { 365 final String METHOD_NAME = GenericTestUtils.getMethodName(); 366 Path filePath = new Path("/" + METHOD_NAME + ".dat"); 367 DFSTestUtil.createFile(fs, filePath, 368 FILE_SIZE, REPL_FACTOR, rand.nextLong()); 369 370 371 DataNode dn = cluster.getDataNodes().get(DN_N0); 372 // all blocks belong to the same file, hence same BP 373 String poolId = cluster.getNamesystem().getBlockPoolId(); 374 375 // Create a bogus new block which will not be present on the namenode. 376 ExtendedBlock b = new ExtendedBlock( 377 poolId, rand.nextLong(), 1024L, rand.nextLong()); 378 dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false); 379 380 DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); 381 StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); 382 sendBlockReports(dnR, poolId, reports); 383 printStats(); 384 385 assertThat("Wrong number of corrupt blocks", 386 cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L)); 387 assertThat("Wrong number of PendingDeletion blocks", 388 cluster.getNamesystem().getPendingDeletionBlocks(), is(1L)); 389 } 390 391 /** 392 * Test creates a file and closes it. 393 * The second datanode is started in the cluster. 394 * As soon as the replication process is completed test runs 395 * Block report and checks that no underreplicated blocks are left 396 * 397 * @throws IOException in case of an error 398 */ 399 @Test(timeout=300000) blockReport_06()400 public void blockReport_06() throws Exception { 401 final String METHOD_NAME = GenericTestUtils.getMethodName(); 402 Path filePath = new Path("/" + METHOD_NAME + ".dat"); 403 final int DN_N1 = DN_N0 + 1; 404 405 writeFile(METHOD_NAME, FILE_SIZE, filePath); 406 startDNandWait(filePath, true); 407 408 // all blocks belong to the same file, hence same BP 409 DataNode dn = cluster.getDataNodes().get(DN_N1); 410 String poolId = cluster.getNamesystem().getBlockPoolId(); 411 DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); 412 StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); 413 sendBlockReports(dnR, poolId, reports); 414 printStats(); 415 assertEquals("Wrong number of PendingReplication Blocks", 416 0, cluster.getNamesystem().getUnderReplicatedBlocks()); 417 } 418 419 /** 420 * Similar to BlockReport_03() but works with two DNs 421 * Test writes a file and closes it. 422 * The second datanode is started in the cluster. 423 * As soon as the replication process is completed test finds a block from 424 * the second DN and sets its GS to be < of original one. 425 * this is the markBlockAsCorrupt case 3 so we expect one pending deletion 426 * Block report is forced and the check for # of currupted blocks is performed. 427 * Another block is chosen and its length is set to a lesser than original. 428 * A check for another corrupted block is performed after yet another 429 * BlockReport 430 * 431 * @throws IOException in case of an error 432 */ 433 @Test(timeout=300000) blockReport_07()434 public void blockReport_07() throws Exception { 435 final String METHOD_NAME = GenericTestUtils.getMethodName(); 436 Path filePath = new Path("/" + METHOD_NAME + ".dat"); 437 final int DN_N1 = DN_N0 + 1; 438 439 // write file and start second node to be "older" than the original 440 writeFile(METHOD_NAME, FILE_SIZE, filePath); 441 startDNandWait(filePath, true); 442 443 // all blocks belong to the same file, hence same BP 444 DataNode dn = cluster.getDataNodes().get(DN_N1); 445 String poolId = cluster.getNamesystem().getBlockPoolId(); 446 DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); 447 StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); 448 sendBlockReports(dnR, poolId, reports); 449 printStats(); 450 451 assertThat("Wrong number of corrupt blocks", 452 cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L)); 453 assertThat("Wrong number of PendingDeletion blocks", 454 cluster.getNamesystem().getPendingDeletionBlocks(), is(1L)); 455 assertThat("Wrong number of PendingReplication blocks", 456 cluster.getNamesystem().getPendingReplicationBlocks(), is(0L)); 457 458 reports = getBlockReports(dn, poolId, false, true); 459 sendBlockReports(dnR, poolId, reports); 460 printStats(); 461 462 assertThat("Wrong number of corrupt blocks", 463 cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L)); 464 assertThat("Wrong number of PendingDeletion blocks", 465 cluster.getNamesystem().getPendingDeletionBlocks(), is(1L)); 466 assertThat("Wrong number of PendingReplication blocks", 467 cluster.getNamesystem().getPendingReplicationBlocks(), is(0L)); 468 469 printStats(); 470 471 } 472 473 /** 474 * The test set the configuration parameters for a large block size and 475 * restarts initiated single-node cluster. 476 * Then it writes a file > block_size and closes it. 477 * The second datanode is started in the cluster. 478 * As soon as the replication process is started and at least one TEMPORARY 479 * replica is found test forces BlockReport process and checks 480 * if the TEMPORARY replica isn't reported on it. 481 * Eventually, the configuration is being restored into the original state. 482 * 483 * @throws IOException in case of an error 484 */ 485 @Test(timeout=300000) blockReport_08()486 public void blockReport_08() throws IOException { 487 final String METHOD_NAME = GenericTestUtils.getMethodName(); 488 Path filePath = new Path("/" + METHOD_NAME + ".dat"); 489 final int DN_N1 = DN_N0 + 1; 490 final int bytesChkSum = 1024 * 1000; 491 492 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum); 493 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum); 494 shutDownCluster(); 495 startUpCluster(); 496 497 try { 498 ArrayList<Block> blocks = 499 writeFile(METHOD_NAME, 12 * bytesChkSum, filePath); 500 Block bl = findBlock(filePath, 12 * bytesChkSum); 501 BlockChecker bc = new BlockChecker(filePath); 502 bc.start(); 503 504 waitForTempReplica(bl, DN_N1); 505 506 // all blocks belong to the same file, hence same BP 507 DataNode dn = cluster.getDataNodes().get(DN_N1); 508 String poolId = cluster.getNamesystem().getBlockPoolId(); 509 DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); 510 StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); 511 sendBlockReports(dnR, poolId, reports); 512 printStats(); 513 assertEquals("Wrong number of PendingReplication blocks", 514 blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); 515 516 try { 517 bc.join(); 518 } catch (InterruptedException e) { } 519 } finally { 520 resetConfiguration(); // return the initial state of the configuration 521 } 522 } 523 524 // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's 525 // replica block. Expect the same behaviour: NN should simply ignore this 526 // block 527 @Test(timeout=300000) blockReport_09()528 public void blockReport_09() throws IOException { 529 final String METHOD_NAME = GenericTestUtils.getMethodName(); 530 Path filePath = new Path("/" + METHOD_NAME + ".dat"); 531 final int DN_N1 = DN_N0 + 1; 532 final int bytesChkSum = 1024 * 1000; 533 534 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytesChkSum); 535 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 6 * bytesChkSum); 536 shutDownCluster(); 537 startUpCluster(); 538 // write file and start second node to be "older" than the original 539 540 try { 541 writeFile(METHOD_NAME, 12 * bytesChkSum, filePath); 542 543 Block bl = findBlock(filePath, 12 * bytesChkSum); 544 BlockChecker bc = new BlockChecker(filePath); 545 bc.start(); 546 547 waitForTempReplica(bl, DN_N1); 548 549 // all blocks belong to the same file, hence same BP 550 DataNode dn = cluster.getDataNodes().get(DN_N1); 551 String poolId = cluster.getNamesystem().getBlockPoolId(); 552 DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); 553 StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true); 554 sendBlockReports(dnR, poolId, reports); 555 printStats(); 556 assertEquals("Wrong number of PendingReplication blocks", 557 2, cluster.getNamesystem().getPendingReplicationBlocks()); 558 559 try { 560 bc.join(); 561 } catch (InterruptedException e) {} 562 } finally { 563 resetConfiguration(); // return the initial state of the configuration 564 } 565 } 566 567 /** 568 * Test for the case where one of the DNs in the pipeline is in the 569 * process of doing a block report exactly when the block is closed. 570 * In this case, the block report becomes delayed until after the 571 * block is marked completed on the NN, and hence it reports an RBW 572 * replica for a COMPLETE block. Such a report should not be marked 573 * corrupt. 574 * This is a regression test for HDFS-2791. 575 */ 576 @Test(timeout=300000) testOneReplicaRbwReportArrivesAfterBlockCompleted()577 public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception { 578 final CountDownLatch brFinished = new CountDownLatch(1); 579 DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) { 580 @Override 581 protected Object passThrough(InvocationOnMock invocation) 582 throws Throwable { 583 try { 584 return super.passThrough(invocation); 585 } finally { 586 // inform the test that our block report went through. 587 brFinished.countDown(); 588 } 589 } 590 }; 591 592 final String METHOD_NAME = GenericTestUtils.getMethodName(); 593 Path filePath = new Path("/" + METHOD_NAME + ".dat"); 594 595 // Start a second DN for this test -- we're checking 596 // what happens when one of the DNs is slowed for some reason. 597 REPL_FACTOR = 2; 598 startDNandWait(null, false); 599 600 NameNode nn = cluster.getNameNode(); 601 602 FSDataOutputStream out = fs.create(filePath, REPL_FACTOR); 603 try { 604 AppendTestUtil.write(out, 0, 10); 605 out.hflush(); 606 607 // Set up a spy so that we can delay the block report coming 608 // from this node. 609 DataNode dn = cluster.getDataNodes().get(0); 610 DatanodeProtocolClientSideTranslatorPB spy = 611 DataNodeTestUtils.spyOnBposToNN(dn, nn); 612 613 Mockito.doAnswer(delayer) 614 .when(spy).blockReport( 615 Mockito.<DatanodeRegistration>anyObject(), 616 Mockito.anyString(), 617 Mockito.<StorageBlockReport[]>anyObject(), 618 Mockito.<BlockReportContext>anyObject()); 619 620 // Force a block report to be generated. The block report will have 621 // an RBW replica in it. Wait for the RPC to be sent, but block 622 // it before it gets to the NN. 623 dn.scheduleAllBlockReport(0); 624 delayer.waitForCall(); 625 626 } finally { 627 IOUtils.closeStream(out); 628 } 629 630 // Now that the stream is closed, the NN will have the block in COMPLETE 631 // state. 632 delayer.proceed(); 633 brFinished.await(); 634 635 // Verify that no replicas are marked corrupt, and that the 636 // file is still readable. 637 BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager()); 638 assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks()); 639 DFSTestUtil.readFile(fs, filePath); 640 641 // Ensure that the file is readable even from the DN that we futzed with. 642 cluster.stopDataNode(1); 643 DFSTestUtil.readFile(fs, filePath); 644 } 645 waitForTempReplica(Block bl, int DN_N1)646 private void waitForTempReplica(Block bl, int DN_N1) throws IOException { 647 final boolean tooLongWait = false; 648 final int TIMEOUT = 40000; 649 650 if(LOG.isDebugEnabled()) { 651 LOG.debug("Wait for datanode " + DN_N1 + " to appear"); 652 } 653 while (cluster.getDataNodes().size() <= DN_N1) { 654 waitTil(20); 655 } 656 if(LOG.isDebugEnabled()) { 657 LOG.debug("Total number of DNs " + cluster.getDataNodes().size()); 658 } 659 cluster.waitActive(); 660 661 // Look about specified DN for the replica of the block from 1st DN 662 final DataNode dn1 = cluster.getDataNodes().get(DN_N1); 663 String bpid = cluster.getNamesystem().getBlockPoolId(); 664 Replica r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId()); 665 long start = Time.monotonicNow(); 666 int count = 0; 667 while (r == null) { 668 waitTil(5); 669 r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId()); 670 long waiting_period = Time.monotonicNow() - start; 671 if (count++ % 100 == 0) 672 if(LOG.isDebugEnabled()) { 673 LOG.debug("Has been waiting for " + waiting_period + " ms."); 674 } 675 if (waiting_period > TIMEOUT) 676 assertTrue("Was waiting too long to get ReplicaInfo from a datanode", 677 tooLongWait); 678 } 679 680 HdfsServerConstants.ReplicaState state = r.getState(); 681 if(LOG.isDebugEnabled()) { 682 LOG.debug("Replica state before the loop " + state.getValue()); 683 } 684 start = Time.monotonicNow(); 685 while (state != HdfsServerConstants.ReplicaState.TEMPORARY) { 686 waitTil(5); 687 state = r.getState(); 688 if(LOG.isDebugEnabled()) { 689 LOG.debug("Keep waiting for " + bl.getBlockName() + 690 " is in state " + state.getValue()); 691 } 692 if (Time.monotonicNow() - start > TIMEOUT) 693 assertTrue("Was waiting too long for a replica to become TEMPORARY", 694 tooLongWait); 695 } 696 if(LOG.isDebugEnabled()) { 697 LOG.debug("Replica state after the loop " + state.getValue()); 698 } 699 } 700 701 // Helper methods from here below... 702 // Write file and start second data node. writeFile(final String METHOD_NAME, final long fileSize, Path filePath)703 private ArrayList<Block> writeFile(final String METHOD_NAME, 704 final long fileSize, 705 Path filePath) { 706 ArrayList<Block> blocks = null; 707 try { 708 REPL_FACTOR = 2; 709 blocks = prepareForRide(filePath, METHOD_NAME, fileSize); 710 } catch (IOException e) { 711 if(LOG.isDebugEnabled()) { 712 LOG.debug("Caught exception ", e); 713 } 714 } 715 return blocks; 716 } 717 startDNandWait(Path filePath, boolean waitReplicas)718 private void startDNandWait(Path filePath, boolean waitReplicas) 719 throws IOException, InterruptedException, TimeoutException { 720 if (LOG.isDebugEnabled()) { 721 LOG.debug("Before next DN start: " + cluster.getDataNodes().size()); 722 } 723 cluster.startDataNodes(conf, 1, true, null, null); 724 cluster.waitClusterUp(); 725 ArrayList<DataNode> datanodes = cluster.getDataNodes(); 726 assertEquals(datanodes.size(), 2); 727 728 if (LOG.isDebugEnabled()) { 729 int lastDn = datanodes.size() - 1; 730 LOG.debug("New datanode " 731 + cluster.getDataNodes().get(lastDn).getDisplayName() 732 + " has been started"); 733 } 734 if (waitReplicas) { 735 DFSTestUtil.waitReplication(fs, filePath, REPL_FACTOR); 736 } 737 } 738 prepareForRide(final Path filePath, final String METHOD_NAME, long fileSize)739 private ArrayList<Block> prepareForRide(final Path filePath, 740 final String METHOD_NAME, 741 long fileSize) throws IOException { 742 LOG.info("Running test " + METHOD_NAME); 743 744 DFSTestUtil.createFile(fs, filePath, fileSize, 745 REPL_FACTOR, rand.nextLong()); 746 747 return locatedToBlocks(cluster.getNameNodeRpc() 748 .getBlockLocations(filePath.toString(), FILE_START, 749 fileSize).getLocatedBlocks(), null); 750 } 751 printStats()752 private void printStats() { 753 BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager()); 754 if(LOG.isDebugEnabled()) { 755 LOG.debug("Missing " + cluster.getNamesystem().getMissingBlocksCount()); 756 LOG.debug("Corrupted " + cluster.getNamesystem().getCorruptReplicaBlocks()); 757 LOG.debug("Under-replicated " + cluster.getNamesystem(). 758 getUnderReplicatedBlocks()); 759 LOG.debug("Pending delete " + cluster.getNamesystem(). 760 getPendingDeletionBlocks()); 761 LOG.debug("Pending replications " + cluster.getNamesystem(). 762 getPendingReplicationBlocks()); 763 LOG.debug("Excess " + cluster.getNamesystem().getExcessBlocks()); 764 LOG.debug("Total " + cluster.getNamesystem().getBlocksTotal()); 765 } 766 } 767 locatedToBlocks(final List<LocatedBlock> locatedBlks, List<Integer> positionsToRemove)768 private ArrayList<Block> locatedToBlocks(final List<LocatedBlock> locatedBlks, 769 List<Integer> positionsToRemove) { 770 ArrayList<Block> newList = new ArrayList<Block>(); 771 for (int i = 0; i < locatedBlks.size(); i++) { 772 if (positionsToRemove != null && positionsToRemove.contains(i)) { 773 if(LOG.isDebugEnabled()) { 774 LOG.debug(i + " block to be omitted"); 775 } 776 continue; 777 } 778 newList.add(new Block(locatedBlks.get(i).getBlock().getLocalBlock())); 779 } 780 return newList; 781 } 782 waitTil(long waitPeriod)783 private void waitTil(long waitPeriod) { 784 try { //Wait til next re-scan 785 Thread.sleep(waitPeriod); 786 } catch (InterruptedException e) { 787 e.printStackTrace(); 788 } 789 } 790 findAllFiles(File top, FilenameFilter mask)791 private List<File> findAllFiles(File top, FilenameFilter mask) { 792 if (top == null) return null; 793 ArrayList<File> ret = new ArrayList<File>(); 794 for (File f : top.listFiles()) { 795 if (f.isDirectory()) 796 ret.addAll(findAllFiles(f, mask)); 797 else if (mask.accept(f, f.getName())) 798 ret.add(f); 799 } 800 return ret; 801 } 802 803 private class MyFileFilter implements FilenameFilter { 804 private String nameToAccept = ""; 805 private boolean all = false; 806 MyFileFilter(String nameToAccept, boolean all)807 public MyFileFilter(String nameToAccept, boolean all) { 808 if (nameToAccept == null) 809 throw new IllegalArgumentException("Argument isn't suppose to be null"); 810 this.nameToAccept = nameToAccept; 811 this.all = all; 812 } 813 814 @Override accept(File file, String s)815 public boolean accept(File file, String s) { 816 if (all) 817 return s != null && s.startsWith(nameToAccept); 818 else 819 return s != null && s.equals(nameToAccept); 820 } 821 } 822 initLoggers()823 private static void initLoggers() { 824 DFSTestUtil.setNameNodeLogLevel(Level.ALL); 825 GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); 826 GenericTestUtils.setLogLevel(BlockReportTestBase.LOG, Level.ALL); 827 } 828 findBlock(Path path, long size)829 private Block findBlock(Path path, long size) throws IOException { 830 Block ret; 831 List<LocatedBlock> lbs = 832 cluster.getNameNodeRpc() 833 .getBlockLocations(path.toString(), 834 FILE_START, size).getLocatedBlocks(); 835 LocatedBlock lb = lbs.get(lbs.size() - 1); 836 837 // Get block from the first DN 838 ret = cluster.getDataNodes().get(DN_N0). 839 data.getStoredBlock(lb.getBlock() 840 .getBlockPoolId(), lb.getBlock().getBlockId()); 841 return ret; 842 } 843 844 private class BlockChecker extends Thread { 845 final Path filePath; 846 BlockChecker(final Path filePath)847 public BlockChecker(final Path filePath) { 848 this.filePath = filePath; 849 } 850 851 @Override run()852 public void run() { 853 try { 854 startDNandWait(filePath, true); 855 } catch (Exception e) { 856 e.printStackTrace(); 857 Assert.fail("Failed to start BlockChecker: " + e); 858 } 859 } 860 } 861 } 862