1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hdfs.server.datanode; 20 21 import static org.junit.Assert.fail; 22 import static org.mockito.Matchers.any; 23 import static org.mockito.Matchers.anyBoolean; 24 import static org.mockito.Matchers.anyListOf; 25 import static org.mockito.Matchers.anyLong; 26 import static org.mockito.Matchers.anyObject; 27 import static org.mockito.Mockito.doReturn; 28 import static org.mockito.Mockito.doThrow; 29 import static org.mockito.Mockito.mock; 30 import static org.mockito.Mockito.never; 31 import static org.mockito.Mockito.spy; 32 import static org.mockito.Mockito.verify; 33 import static org.mockito.Mockito.when; 34 35 import java.io.File; 36 import java.io.IOException; 37 import java.net.InetSocketAddress; 38 import java.net.URISyntaxException; 39 import java.util.ArrayList; 40 import java.util.Collection; 41 import java.util.List; 42 import java.util.concurrent.atomic.AtomicBoolean; 43 44 import org.apache.commons.logging.Log; 45 import org.apache.commons.logging.LogFactory; 46 import org.apache.commons.logging.impl.Log4JLogger; 47 import org.apache.hadoop.conf.Configuration; 48 import org.apache.hadoop.fs.CommonConfigurationKeys; 49 import org.apache.hadoop.fs.FSDataOutputStream; 50 import org.apache.hadoop.fs.FileSystem; 51 import org.apache.hadoop.fs.FileUtil; 52 import org.apache.hadoop.fs.Path; 53 import org.apache.hadoop.fs.StorageType; 54 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; 55 import org.apache.hadoop.hdfs.DFSConfigKeys; 56 import org.apache.hadoop.hdfs.DFSTestUtil; 57 import org.apache.hadoop.hdfs.DistributedFileSystem; 58 import org.apache.hadoop.hdfs.HdfsConfiguration; 59 import org.apache.hadoop.hdfs.MiniDFSCluster; 60 import org.apache.hadoop.hdfs.protocol.DatanodeID; 61 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 62 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 63 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 64 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; 65 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; 66 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; 67 import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord; 68 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; 69 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; 70 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; 71 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; 72 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; 73 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; 74 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; 75 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; 76 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; 77 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; 78 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; 79 import org.apache.hadoop.hdfs.server.protocol.StorageReport; 80 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; 81 import org.apache.hadoop.test.GenericTestUtils; 82 import org.apache.hadoop.util.Daemon; 83 import org.apache.hadoop.util.DataChecksum; 84 import org.apache.log4j.Level; 85 import org.junit.After; 86 import org.junit.Assert; 87 import org.junit.Before; 88 import org.junit.Test; 89 import org.mockito.Mockito; 90 import org.mockito.invocation.InvocationOnMock; 91 import org.mockito.stubbing.Answer; 92 93 /** 94 * This tests if sync all replicas in block recovery works correctly 95 */ 96 public class TestBlockRecovery { 97 private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class); 98 private static final String DATA_DIR = 99 MiniDFSCluster.getBaseDirectory() + "data"; 100 private DataNode dn; 101 private Configuration conf; 102 private final static long RECOVERY_ID = 3000L; 103 private final static String CLUSTER_ID = "testClusterID"; 104 private final static String POOL_ID = "BP-TEST"; 105 private final static InetSocketAddress NN_ADDR = new InetSocketAddress( 106 "localhost", 5020); 107 private final static long BLOCK_ID = 1000L; 108 private final static long GEN_STAMP = 2000L; 109 private final static long BLOCK_LEN = 3000L; 110 private final static long REPLICA_LEN1 = 6000L; 111 private final static long REPLICA_LEN2 = 5000L; 112 private final static ExtendedBlock block = new ExtendedBlock(POOL_ID, 113 BLOCK_ID, BLOCK_LEN, GEN_STAMP); 114 115 static { 116 ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); 117 ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL); 118 } 119 120 /** 121 * Starts an instance of DataNode 122 * @throws IOException 123 */ 124 @Before startUp()125 public void startUp() throws IOException, URISyntaxException { 126 conf = new HdfsConfiguration(); 127 conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR); 128 conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); 129 conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); 130 conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); 131 conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); 132 FileSystem.setDefaultUri(conf, 133 "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); 134 ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>(); 135 File dataDir = new File(DATA_DIR); 136 FileUtil.fullyDelete(dataDir); 137 dataDir.mkdirs(); 138 StorageLocation location = StorageLocation.parse(dataDir.getPath()); 139 locations.add(location); 140 final DatanodeProtocolClientSideTranslatorPB namenode = 141 mock(DatanodeProtocolClientSideTranslatorPB.class); 142 143 Mockito.doAnswer(new Answer<DatanodeRegistration>() { 144 @Override 145 public DatanodeRegistration answer(InvocationOnMock invocation) 146 throws Throwable { 147 return (DatanodeRegistration) invocation.getArguments()[0]; 148 } 149 }).when(namenode).registerDatanode( 150 Mockito.any(DatanodeRegistration.class)); 151 152 when(namenode.versionRequest()).thenReturn(new NamespaceInfo 153 (1, CLUSTER_ID, POOL_ID, 1L)); 154 155 when(namenode.sendHeartbeat( 156 Mockito.any(DatanodeRegistration.class), 157 Mockito.any(StorageReport[].class), 158 Mockito.anyLong(), 159 Mockito.anyLong(), 160 Mockito.anyInt(), 161 Mockito.anyInt(), 162 Mockito.anyInt(), 163 Mockito.any(VolumeFailureSummary.class))) 164 .thenReturn(new HeartbeatResponse( 165 new DatanodeCommand[0], 166 new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), 167 null)); 168 169 dn = new DataNode(conf, locations, null) { 170 @Override 171 DatanodeProtocolClientSideTranslatorPB connectToNN( 172 InetSocketAddress nnAddr) throws IOException { 173 Assert.assertEquals(NN_ADDR, nnAddr); 174 return namenode; 175 } 176 }; 177 // Trigger a heartbeat so that it acknowledges the NN as active. 178 dn.getAllBpOs()[0].triggerHeartbeatForTests(); 179 } 180 181 /** 182 * Cleans the resources and closes the instance of datanode 183 * @throws IOException if an error occurred 184 */ 185 @After tearDown()186 public void tearDown() throws IOException { 187 if (dn != null) { 188 try { 189 dn.shutdown(); 190 } catch(Exception e) { 191 LOG.error("Cannot close: ", e); 192 } finally { 193 File dir = new File(DATA_DIR); 194 if (dir.exists()) 195 Assert.assertTrue( 196 "Cannot delete data-node dirs", FileUtil.fullyDelete(dir)); 197 } 198 } 199 } 200 201 /** Sync two replicas */ testSyncReplicas(ReplicaRecoveryInfo replica1, ReplicaRecoveryInfo replica2, InterDatanodeProtocol dn1, InterDatanodeProtocol dn2, long expectLen)202 private void testSyncReplicas(ReplicaRecoveryInfo replica1, 203 ReplicaRecoveryInfo replica2, 204 InterDatanodeProtocol dn1, 205 InterDatanodeProtocol dn2, 206 long expectLen) throws IOException { 207 208 DatanodeInfo[] locs = new DatanodeInfo[]{ 209 mock(DatanodeInfo.class), mock(DatanodeInfo.class)}; 210 RecoveringBlock rBlock = new RecoveringBlock(block, 211 locs, RECOVERY_ID); 212 ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2); 213 BlockRecord record1 = new BlockRecord( 214 DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn1, replica1); 215 BlockRecord record2 = new BlockRecord( 216 DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2); 217 syncList.add(record1); 218 syncList.add(record2); 219 220 when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), 221 anyLong(), anyLong())).thenReturn("storage1"); 222 when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), 223 anyLong(), anyLong())).thenReturn("storage2"); 224 dn.syncBlock(rBlock, syncList); 225 } 226 227 /** 228 * BlockRecovery_02.8. 229 * Two replicas are in Finalized state 230 * @throws IOException in case of an error 231 */ 232 @Test testFinalizedReplicas()233 public void testFinalizedReplicas () throws IOException { 234 if(LOG.isDebugEnabled()) { 235 LOG.debug("Running " + GenericTestUtils.getMethodName()); 236 } 237 ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 238 REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); 239 ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 240 REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED); 241 242 InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); 243 InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); 244 245 testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); 246 verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 247 REPLICA_LEN1); 248 verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 249 REPLICA_LEN1); 250 251 // two finalized replicas have different length 252 replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 253 REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); 254 replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 255 REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED); 256 257 try { 258 testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); 259 Assert.fail("Two finalized replicas should not have different lengthes!"); 260 } catch (IOException e) { 261 Assert.assertTrue(e.getMessage().startsWith( 262 "Inconsistent size of finalized replicas. ")); 263 } 264 } 265 266 /** 267 * BlockRecovery_02.9. 268 * One replica is Finalized and another is RBW. 269 * @throws IOException in case of an error 270 */ 271 @Test testFinalizedRbwReplicas()272 public void testFinalizedRbwReplicas() throws IOException { 273 if(LOG.isDebugEnabled()) { 274 LOG.debug("Running " + GenericTestUtils.getMethodName()); 275 } 276 277 // rbw and finalized replicas have the same length 278 ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 279 REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); 280 ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 281 REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW); 282 283 InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); 284 InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); 285 286 testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); 287 verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 288 REPLICA_LEN1); 289 verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 290 REPLICA_LEN1); 291 292 // rbw replica has a different length from the finalized one 293 replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 294 REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); 295 replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 296 REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); 297 298 dn1 = mock(InterDatanodeProtocol.class); 299 dn2 = mock(InterDatanodeProtocol.class); 300 301 testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); 302 verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); 303 verify(dn2, never()).updateReplicaUnderRecovery( 304 block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); 305 } 306 307 /** 308 * BlockRecovery_02.10. 309 * One replica is Finalized and another is RWR. 310 * @throws IOException in case of an error 311 */ 312 @Test testFinalizedRwrReplicas()313 public void testFinalizedRwrReplicas() throws IOException { 314 if(LOG.isDebugEnabled()) { 315 LOG.debug("Running " + GenericTestUtils.getMethodName()); 316 } 317 318 // rbw and finalized replicas have the same length 319 ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 320 REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); 321 ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 322 REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR); 323 324 InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); 325 InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); 326 327 testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); 328 verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 329 REPLICA_LEN1); 330 verify(dn2, never()).updateReplicaUnderRecovery( 331 block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); 332 333 // rbw replica has a different length from the finalized one 334 replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 335 REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED); 336 replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 337 REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); 338 339 dn1 = mock(InterDatanodeProtocol.class); 340 dn2 = mock(InterDatanodeProtocol.class); 341 342 testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); 343 verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 344 REPLICA_LEN1); 345 verify(dn2, never()).updateReplicaUnderRecovery( 346 block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); 347 } 348 349 /** 350 * BlockRecovery_02.11. 351 * Two replicas are RBW. 352 * @throws IOException in case of an error 353 */ 354 @Test testRBWReplicas()355 public void testRBWReplicas() throws IOException { 356 if(LOG.isDebugEnabled()) { 357 LOG.debug("Running " + GenericTestUtils.getMethodName()); 358 } 359 ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 360 REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW); 361 ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 362 REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW); 363 364 InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); 365 InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); 366 367 long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2); 368 testSyncReplicas(replica1, replica2, dn1, dn2, minLen); 369 verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); 370 verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); 371 } 372 373 /** 374 * BlockRecovery_02.12. 375 * One replica is RBW and another is RWR. 376 * @throws IOException in case of an error 377 */ 378 @Test testRBW_RWRReplicas()379 public void testRBW_RWRReplicas() throws IOException { 380 if(LOG.isDebugEnabled()) { 381 LOG.debug("Running " + GenericTestUtils.getMethodName()); 382 } 383 ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 384 REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW); 385 ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 386 REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR); 387 388 InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); 389 InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); 390 391 testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1); 392 verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, 393 REPLICA_LEN1); 394 verify(dn2, never()).updateReplicaUnderRecovery( 395 block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1); 396 } 397 398 /** 399 * BlockRecovery_02.13. 400 * Two replicas are RWR. 401 * @throws IOException in case of an error 402 */ 403 @Test testRWRReplicas()404 public void testRWRReplicas() throws IOException { 405 if(LOG.isDebugEnabled()) { 406 LOG.debug("Running " + GenericTestUtils.getMethodName()); 407 } 408 ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID, 409 REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR); 410 ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID, 411 REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR); 412 413 InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class); 414 InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class); 415 416 long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2); 417 testSyncReplicas(replica1, replica2, dn1, dn2, minLen); 418 419 verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); 420 verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen); 421 } 422 initRecoveringBlocks()423 private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException { 424 Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1); 425 DatanodeInfo mockOtherDN = DFSTestUtil.getLocalDatanodeInfo(); 426 DatanodeInfo[] locs = new DatanodeInfo[] { 427 new DatanodeInfo(dn.getDNRegistrationForBP(block.getBlockPoolId())), 428 mockOtherDN }; 429 RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID); 430 blocks.add(rBlock); 431 return blocks; 432 } 433 /** 434 * BlockRecoveryFI_05. One DN throws RecoveryInProgressException. 435 * 436 * @throws IOException 437 * in case of an error 438 */ 439 @Test testRecoveryInProgressException()440 public void testRecoveryInProgressException() 441 throws IOException, InterruptedException { 442 if(LOG.isDebugEnabled()) { 443 LOG.debug("Running " + GenericTestUtils.getMethodName()); 444 } 445 DataNode spyDN = spy(dn); 446 doThrow(new RecoveryInProgressException("Replica recovery is in progress")). 447 when(spyDN).initReplicaRecovery(any(RecoveringBlock.class)); 448 Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks()); 449 d.join(); 450 verify(spyDN, never()).syncBlock( 451 any(RecoveringBlock.class), anyListOf(BlockRecord.class)); 452 } 453 454 /** 455 * BlockRecoveryFI_06. all datanodes throws an exception. 456 * 457 * @throws IOException 458 * in case of an error 459 */ 460 @Test testErrorReplicas()461 public void testErrorReplicas() throws IOException, InterruptedException { 462 if(LOG.isDebugEnabled()) { 463 LOG.debug("Running " + GenericTestUtils.getMethodName()); 464 } 465 DataNode spyDN = spy(dn); 466 doThrow(new IOException()). 467 when(spyDN).initReplicaRecovery(any(RecoveringBlock.class)); 468 Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks()); 469 d.join(); 470 verify(spyDN, never()).syncBlock( 471 any(RecoveringBlock.class), anyListOf(BlockRecord.class)); 472 } 473 474 /** 475 * BlockRecoveryFI_07. max replica length from all DNs is zero. 476 * 477 * @throws IOException in case of an error 478 */ 479 @Test testZeroLenReplicas()480 public void testZeroLenReplicas() throws IOException, InterruptedException { 481 if(LOG.isDebugEnabled()) { 482 LOG.debug("Running " + GenericTestUtils.getMethodName()); 483 } 484 DataNode spyDN = spy(dn); 485 doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0, 486 block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN). 487 initReplicaRecovery(any(RecoveringBlock.class)); 488 Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks()); 489 d.join(); 490 DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID); 491 verify(dnP).commitBlockSynchronization( 492 block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null); 493 } 494 initBlockRecords(DataNode spyDN)495 private List<BlockRecord> initBlockRecords(DataNode spyDN) throws IOException { 496 List<BlockRecord> blocks = new ArrayList<BlockRecord>(1); 497 DatanodeRegistration dnR = dn.getDNRegistrationForBP(block.getBlockPoolId()); 498 BlockRecord blockRecord = new BlockRecord( 499 new DatanodeID(dnR), spyDN, 500 new ReplicaRecoveryInfo(block.getBlockId(), block.getNumBytes(), 501 block.getGenerationStamp(), ReplicaState.FINALIZED)); 502 blocks.add(blockRecord); 503 return blocks; 504 } 505 506 private final static RecoveringBlock rBlock = 507 new RecoveringBlock(block, null, RECOVERY_ID); 508 509 /** 510 * BlockRecoveryFI_09. some/all DNs failed to update replicas. 511 * 512 * @throws IOException in case of an error 513 */ 514 @Test testFailedReplicaUpdate()515 public void testFailedReplicaUpdate() throws IOException { 516 if(LOG.isDebugEnabled()) { 517 LOG.debug("Running " + GenericTestUtils.getMethodName()); 518 } 519 DataNode spyDN = spy(dn); 520 doThrow(new IOException()).when(spyDN).updateReplicaUnderRecovery( 521 block, RECOVERY_ID, BLOCK_ID, block.getNumBytes()); 522 try { 523 spyDN.syncBlock(rBlock, initBlockRecords(spyDN)); 524 fail("Sync should fail"); 525 } catch (IOException e) { 526 e.getMessage().startsWith("Cannot recover "); 527 } 528 } 529 530 /** 531 * BlockRecoveryFI_10. DN has no ReplicaUnderRecovery. 532 * 533 * @throws IOException in case of an error 534 */ 535 @Test testNoReplicaUnderRecovery()536 public void testNoReplicaUnderRecovery() throws IOException { 537 if(LOG.isDebugEnabled()) { 538 LOG.debug("Running " + GenericTestUtils.getMethodName()); 539 } 540 dn.data.createRbw(StorageType.DEFAULT, block, false); 541 try { 542 dn.syncBlock(rBlock, initBlockRecords(dn)); 543 fail("Sync should fail"); 544 } catch (IOException e) { 545 e.getMessage().startsWith("Cannot recover "); 546 } 547 DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); 548 verify(namenode, never()).commitBlockSynchronization( 549 any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), 550 anyBoolean(), any(DatanodeID[].class), any(String[].class)); 551 } 552 553 /** 554 * BlockRecoveryFI_11. a replica's recovery id does not match new GS. 555 * 556 * @throws IOException in case of an error 557 */ 558 @Test testNotMatchedReplicaID()559 public void testNotMatchedReplicaID() throws IOException { 560 if(LOG.isDebugEnabled()) { 561 LOG.debug("Running " + GenericTestUtils.getMethodName()); 562 } 563 ReplicaInPipelineInterface replicaInfo = dn.data.createRbw( 564 StorageType.DEFAULT, block, false).getReplica(); 565 ReplicaOutputStreams streams = null; 566 try { 567 streams = replicaInfo.createStreams(true, 568 DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); 569 streams.getChecksumOut().write('a'); 570 dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); 571 try { 572 dn.syncBlock(rBlock, initBlockRecords(dn)); 573 fail("Sync should fail"); 574 } catch (IOException e) { 575 e.getMessage().startsWith("Cannot recover "); 576 } 577 DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); 578 verify(namenode, never()).commitBlockSynchronization( 579 any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), 580 anyBoolean(), any(DatanodeID[].class), any(String[].class)); 581 } finally { 582 streams.close(); 583 } 584 } 585 586 /** 587 * Test to verify the race between finalizeBlock and Lease recovery 588 * 589 * @throws Exception 590 */ 591 @Test(timeout = 20000) testRaceBetweenReplicaRecoveryAndFinalizeBlock()592 public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception { 593 tearDown();// Stop the Mocked DN started in startup() 594 595 Configuration conf = new HdfsConfiguration(); 596 conf.set(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, "1000"); 597 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 598 .numDataNodes(1).build(); 599 try { 600 cluster.waitClusterUp(); 601 DistributedFileSystem fs = cluster.getFileSystem(); 602 Path path = new Path("/test"); 603 FSDataOutputStream out = fs.create(path); 604 out.writeBytes("data"); 605 out.hsync(); 606 607 List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path)); 608 final LocatedBlock block = blocks.get(0); 609 final DataNode dataNode = cluster.getDataNodes().get(0); 610 611 final AtomicBoolean recoveryInitResult = new AtomicBoolean(true); 612 Thread recoveryThread = new Thread() { 613 @Override 614 public void run() { 615 try { 616 DatanodeInfo[] locations = block.getLocations(); 617 final RecoveringBlock recoveringBlock = new RecoveringBlock( 618 block.getBlock(), locations, block.getBlock() 619 .getGenerationStamp() + 1); 620 synchronized (dataNode.data) { 621 Thread.sleep(2000); 622 dataNode.initReplicaRecovery(recoveringBlock); 623 } 624 } catch (Exception e) { 625 recoveryInitResult.set(false); 626 } 627 } 628 }; 629 recoveryThread.start(); 630 try { 631 out.close(); 632 } catch (IOException e) { 633 Assert.assertTrue("Writing should fail", 634 e.getMessage().contains("are bad. Aborting...")); 635 } finally { 636 recoveryThread.join(); 637 } 638 Assert.assertTrue("Recovery should be initiated successfully", 639 recoveryInitResult.get()); 640 641 dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock() 642 .getGenerationStamp() + 1, block.getBlock().getBlockId(), 643 block.getBlockSize()); 644 } finally { 645 if (null != cluster) { 646 cluster.shutdown(); 647 cluster = null; 648 } 649 } 650 } 651 } 652