1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hdfs.server.datanode;
20 
21 import static org.junit.Assert.fail;
22 import static org.mockito.Matchers.any;
23 import static org.mockito.Matchers.anyBoolean;
24 import static org.mockito.Matchers.anyListOf;
25 import static org.mockito.Matchers.anyLong;
26 import static org.mockito.Matchers.anyObject;
27 import static org.mockito.Mockito.doReturn;
28 import static org.mockito.Mockito.doThrow;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34 
35 import java.io.File;
36 import java.io.IOException;
37 import java.net.InetSocketAddress;
38 import java.net.URISyntaxException;
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.List;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.commons.logging.impl.Log4JLogger;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.fs.CommonConfigurationKeys;
49 import org.apache.hadoop.fs.FSDataOutputStream;
50 import org.apache.hadoop.fs.FileSystem;
51 import org.apache.hadoop.fs.FileUtil;
52 import org.apache.hadoop.fs.Path;
53 import org.apache.hadoop.fs.StorageType;
54 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
55 import org.apache.hadoop.hdfs.DFSConfigKeys;
56 import org.apache.hadoop.hdfs.DFSTestUtil;
57 import org.apache.hadoop.hdfs.DistributedFileSystem;
58 import org.apache.hadoop.hdfs.HdfsConfiguration;
59 import org.apache.hadoop.hdfs.MiniDFSCluster;
60 import org.apache.hadoop.hdfs.protocol.DatanodeID;
61 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
62 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
63 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
64 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
65 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
66 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
67 import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
68 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
69 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
70 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
71 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
72 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
73 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
74 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
75 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
76 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
77 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
78 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
79 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
80 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
81 import org.apache.hadoop.test.GenericTestUtils;
82 import org.apache.hadoop.util.Daemon;
83 import org.apache.hadoop.util.DataChecksum;
84 import org.apache.log4j.Level;
85 import org.junit.After;
86 import org.junit.Assert;
87 import org.junit.Before;
88 import org.junit.Test;
89 import org.mockito.Mockito;
90 import org.mockito.invocation.InvocationOnMock;
91 import org.mockito.stubbing.Answer;
92 
93 /**
94  * This tests if sync all replicas in block recovery works correctly
95  */
96 public class TestBlockRecovery {
97   private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class);
98   private static final String DATA_DIR =
99     MiniDFSCluster.getBaseDirectory() + "data";
100   private DataNode dn;
101   private Configuration conf;
102   private final static long RECOVERY_ID = 3000L;
103   private final static String CLUSTER_ID = "testClusterID";
104   private final static String POOL_ID = "BP-TEST";
105   private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
106       "localhost", 5020);
107   private final static long BLOCK_ID = 1000L;
108   private final static long GEN_STAMP = 2000L;
109   private final static long BLOCK_LEN = 3000L;
110   private final static long REPLICA_LEN1 = 6000L;
111   private final static long REPLICA_LEN2 = 5000L;
112   private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
113       BLOCK_ID, BLOCK_LEN, GEN_STAMP);
114 
115   static {
116     ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
117     ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
118   }
119 
120   /**
121    * Starts an instance of DataNode
122    * @throws IOException
123    */
124   @Before
startUp()125   public void startUp() throws IOException, URISyntaxException {
126     conf = new HdfsConfiguration();
127     conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
128     conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
129     conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
130     conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
131     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
132     FileSystem.setDefaultUri(conf,
133         "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
134     ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
135     File dataDir = new File(DATA_DIR);
136     FileUtil.fullyDelete(dataDir);
137     dataDir.mkdirs();
138     StorageLocation location = StorageLocation.parse(dataDir.getPath());
139     locations.add(location);
140     final DatanodeProtocolClientSideTranslatorPB namenode =
141       mock(DatanodeProtocolClientSideTranslatorPB.class);
142 
143     Mockito.doAnswer(new Answer<DatanodeRegistration>() {
144       @Override
145       public DatanodeRegistration answer(InvocationOnMock invocation)
146           throws Throwable {
147         return (DatanodeRegistration) invocation.getArguments()[0];
148       }
149     }).when(namenode).registerDatanode(
150         Mockito.any(DatanodeRegistration.class));
151 
152     when(namenode.versionRequest()).thenReturn(new NamespaceInfo
153         (1, CLUSTER_ID, POOL_ID, 1L));
154 
155     when(namenode.sendHeartbeat(
156             Mockito.any(DatanodeRegistration.class),
157             Mockito.any(StorageReport[].class),
158             Mockito.anyLong(),
159             Mockito.anyLong(),
160             Mockito.anyInt(),
161             Mockito.anyInt(),
162             Mockito.anyInt(),
163             Mockito.any(VolumeFailureSummary.class)))
164         .thenReturn(new HeartbeatResponse(
165             new DatanodeCommand[0],
166             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
167             null));
168 
169     dn = new DataNode(conf, locations, null) {
170       @Override
171       DatanodeProtocolClientSideTranslatorPB connectToNN(
172           InetSocketAddress nnAddr) throws IOException {
173         Assert.assertEquals(NN_ADDR, nnAddr);
174         return namenode;
175       }
176     };
177     // Trigger a heartbeat so that it acknowledges the NN as active.
178     dn.getAllBpOs()[0].triggerHeartbeatForTests();
179   }
180 
181   /**
182    * Cleans the resources and closes the instance of datanode
183    * @throws IOException if an error occurred
184    */
185   @After
tearDown()186   public void tearDown() throws IOException {
187     if (dn != null) {
188       try {
189         dn.shutdown();
190       } catch(Exception e) {
191         LOG.error("Cannot close: ", e);
192       } finally {
193         File dir = new File(DATA_DIR);
194         if (dir.exists())
195           Assert.assertTrue(
196               "Cannot delete data-node dirs", FileUtil.fullyDelete(dir));
197       }
198     }
199   }
200 
201   /** Sync two replicas */
testSyncReplicas(ReplicaRecoveryInfo replica1, ReplicaRecoveryInfo replica2, InterDatanodeProtocol dn1, InterDatanodeProtocol dn2, long expectLen)202   private void testSyncReplicas(ReplicaRecoveryInfo replica1,
203       ReplicaRecoveryInfo replica2,
204       InterDatanodeProtocol dn1,
205       InterDatanodeProtocol dn2,
206       long expectLen) throws IOException {
207 
208     DatanodeInfo[] locs = new DatanodeInfo[]{
209         mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
210     RecoveringBlock rBlock = new RecoveringBlock(block,
211         locs, RECOVERY_ID);
212     ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
213     BlockRecord record1 = new BlockRecord(
214         DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn1, replica1);
215     BlockRecord record2 = new BlockRecord(
216         DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2);
217     syncList.add(record1);
218     syncList.add(record2);
219 
220     when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
221         anyLong(), anyLong())).thenReturn("storage1");
222     when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
223         anyLong(), anyLong())).thenReturn("storage2");
224     dn.syncBlock(rBlock, syncList);
225   }
226 
227   /**
228    * BlockRecovery_02.8.
229    * Two replicas are in Finalized state
230    * @throws IOException in case of an error
231    */
232   @Test
testFinalizedReplicas()233   public void testFinalizedReplicas () throws IOException {
234     if(LOG.isDebugEnabled()) {
235       LOG.debug("Running " + GenericTestUtils.getMethodName());
236     }
237     ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
238         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
239     ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
240         REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED);
241 
242     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
243     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
244 
245     testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
246     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
247         REPLICA_LEN1);
248     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
249         REPLICA_LEN1);
250 
251     // two finalized replicas have different length
252     replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
253         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
254     replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
255         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED);
256 
257     try {
258       testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
259       Assert.fail("Two finalized replicas should not have different lengthes!");
260     } catch (IOException e) {
261       Assert.assertTrue(e.getMessage().startsWith(
262           "Inconsistent size of finalized replicas. "));
263     }
264   }
265 
266   /**
267    * BlockRecovery_02.9.
268    * One replica is Finalized and another is RBW.
269    * @throws IOException in case of an error
270    */
271   @Test
testFinalizedRbwReplicas()272   public void testFinalizedRbwReplicas() throws IOException {
273     if(LOG.isDebugEnabled()) {
274       LOG.debug("Running " + GenericTestUtils.getMethodName());
275     }
276 
277     // rbw and finalized replicas have the same length
278     ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
279         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
280     ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
281         REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW);
282 
283     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
284     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
285 
286     testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
287     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
288         REPLICA_LEN1);
289     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
290         REPLICA_LEN1);
291 
292     // rbw replica has a different length from the finalized one
293     replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
294         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
295     replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
296         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
297 
298     dn1 = mock(InterDatanodeProtocol.class);
299     dn2 = mock(InterDatanodeProtocol.class);
300 
301     testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
302     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
303     verify(dn2, never()).updateReplicaUnderRecovery(
304         block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
305   }
306 
307   /**
308    * BlockRecovery_02.10.
309    * One replica is Finalized and another is RWR.
310    * @throws IOException in case of an error
311    */
312   @Test
testFinalizedRwrReplicas()313   public void testFinalizedRwrReplicas() throws IOException {
314     if(LOG.isDebugEnabled()) {
315       LOG.debug("Running " + GenericTestUtils.getMethodName());
316     }
317 
318     // rbw and finalized replicas have the same length
319     ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
320         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
321     ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
322         REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
323 
324     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
325     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
326 
327     testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
328     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
329         REPLICA_LEN1);
330     verify(dn2, never()).updateReplicaUnderRecovery(
331         block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
332 
333     // rbw replica has a different length from the finalized one
334     replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
335         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
336     replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
337         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
338 
339     dn1 = mock(InterDatanodeProtocol.class);
340     dn2 = mock(InterDatanodeProtocol.class);
341 
342     testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
343     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
344         REPLICA_LEN1);
345     verify(dn2, never()).updateReplicaUnderRecovery(
346         block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
347   }
348 
349   /**
350    * BlockRecovery_02.11.
351    * Two replicas are RBW.
352    * @throws IOException in case of an error
353    */
354   @Test
testRBWReplicas()355   public void testRBWReplicas() throws IOException {
356     if(LOG.isDebugEnabled()) {
357       LOG.debug("Running " + GenericTestUtils.getMethodName());
358     }
359     ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
360         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
361     ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
362         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
363 
364     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
365     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
366 
367     long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
368     testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
369     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
370     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
371   }
372 
373   /**
374    * BlockRecovery_02.12.
375    * One replica is RBW and another is RWR.
376    * @throws IOException in case of an error
377    */
378   @Test
testRBW_RWRReplicas()379   public void testRBW_RWRReplicas() throws IOException {
380     if(LOG.isDebugEnabled()) {
381       LOG.debug("Running " + GenericTestUtils.getMethodName());
382     }
383     ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
384         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
385     ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
386         REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
387 
388     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
389     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
390 
391     testSyncReplicas(replica1, replica2, dn1, dn2, REPLICA_LEN1);
392     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID,
393         REPLICA_LEN1);
394     verify(dn2, never()).updateReplicaUnderRecovery(
395         block, RECOVERY_ID, BLOCK_ID, REPLICA_LEN1);
396   }
397 
398   /**
399    * BlockRecovery_02.13.
400    * Two replicas are RWR.
401    * @throws IOException in case of an error
402    */
403   @Test
testRWRReplicas()404   public void testRWRReplicas() throws IOException {
405     if(LOG.isDebugEnabled()) {
406       LOG.debug("Running " + GenericTestUtils.getMethodName());
407     }
408     ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
409         REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR);
410     ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
411         REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR);
412 
413     InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
414     InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
415 
416     long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
417     testSyncReplicas(replica1, replica2, dn1, dn2, minLen);
418 
419     verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
420     verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, BLOCK_ID, minLen);
421   }
422 
initRecoveringBlocks()423   private Collection<RecoveringBlock> initRecoveringBlocks() throws IOException {
424     Collection<RecoveringBlock> blocks = new ArrayList<RecoveringBlock>(1);
425     DatanodeInfo mockOtherDN = DFSTestUtil.getLocalDatanodeInfo();
426     DatanodeInfo[] locs = new DatanodeInfo[] {
427         new DatanodeInfo(dn.getDNRegistrationForBP(block.getBlockPoolId())),
428         mockOtherDN };
429     RecoveringBlock rBlock = new RecoveringBlock(block, locs, RECOVERY_ID);
430     blocks.add(rBlock);
431     return blocks;
432   }
433   /**
434    * BlockRecoveryFI_05. One DN throws RecoveryInProgressException.
435    *
436    * @throws IOException
437    *           in case of an error
438    */
439   @Test
testRecoveryInProgressException()440   public void testRecoveryInProgressException()
441     throws IOException, InterruptedException {
442     if(LOG.isDebugEnabled()) {
443       LOG.debug("Running " + GenericTestUtils.getMethodName());
444     }
445     DataNode spyDN = spy(dn);
446     doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
447        when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
448     Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
449     d.join();
450     verify(spyDN, never()).syncBlock(
451         any(RecoveringBlock.class), anyListOf(BlockRecord.class));
452   }
453 
454   /**
455    * BlockRecoveryFI_06. all datanodes throws an exception.
456    *
457    * @throws IOException
458    *           in case of an error
459    */
460   @Test
testErrorReplicas()461   public void testErrorReplicas() throws IOException, InterruptedException {
462     if(LOG.isDebugEnabled()) {
463       LOG.debug("Running " + GenericTestUtils.getMethodName());
464     }
465     DataNode spyDN = spy(dn);
466     doThrow(new IOException()).
467        when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
468     Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
469     d.join();
470     verify(spyDN, never()).syncBlock(
471         any(RecoveringBlock.class), anyListOf(BlockRecord.class));
472   }
473 
474   /**
475    * BlockRecoveryFI_07. max replica length from all DNs is zero.
476    *
477    * @throws IOException in case of an error
478    */
479   @Test
testZeroLenReplicas()480   public void testZeroLenReplicas() throws IOException, InterruptedException {
481     if(LOG.isDebugEnabled()) {
482       LOG.debug("Running " + GenericTestUtils.getMethodName());
483     }
484     DataNode spyDN = spy(dn);
485     doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
486         block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
487         initReplicaRecovery(any(RecoveringBlock.class));
488     Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
489     d.join();
490     DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
491     verify(dnP).commitBlockSynchronization(
492         block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
493   }
494 
initBlockRecords(DataNode spyDN)495   private List<BlockRecord> initBlockRecords(DataNode spyDN) throws IOException {
496     List<BlockRecord> blocks = new ArrayList<BlockRecord>(1);
497     DatanodeRegistration dnR = dn.getDNRegistrationForBP(block.getBlockPoolId());
498     BlockRecord blockRecord = new BlockRecord(
499         new DatanodeID(dnR), spyDN,
500         new ReplicaRecoveryInfo(block.getBlockId(), block.getNumBytes(),
501             block.getGenerationStamp(), ReplicaState.FINALIZED));
502     blocks.add(blockRecord);
503     return blocks;
504   }
505 
506   private final static RecoveringBlock rBlock =
507     new RecoveringBlock(block, null, RECOVERY_ID);
508 
509   /**
510    * BlockRecoveryFI_09. some/all DNs failed to update replicas.
511    *
512    * @throws IOException in case of an error
513    */
514   @Test
testFailedReplicaUpdate()515   public void testFailedReplicaUpdate() throws IOException {
516     if(LOG.isDebugEnabled()) {
517       LOG.debug("Running " + GenericTestUtils.getMethodName());
518     }
519     DataNode spyDN = spy(dn);
520     doThrow(new IOException()).when(spyDN).updateReplicaUnderRecovery(
521         block, RECOVERY_ID, BLOCK_ID, block.getNumBytes());
522     try {
523       spyDN.syncBlock(rBlock, initBlockRecords(spyDN));
524       fail("Sync should fail");
525     } catch (IOException e) {
526       e.getMessage().startsWith("Cannot recover ");
527     }
528   }
529 
530   /**
531    * BlockRecoveryFI_10. DN has no ReplicaUnderRecovery.
532    *
533    * @throws IOException in case of an error
534    */
535   @Test
testNoReplicaUnderRecovery()536   public void testNoReplicaUnderRecovery() throws IOException {
537     if(LOG.isDebugEnabled()) {
538       LOG.debug("Running " + GenericTestUtils.getMethodName());
539     }
540     dn.data.createRbw(StorageType.DEFAULT, block, false);
541     try {
542       dn.syncBlock(rBlock, initBlockRecords(dn));
543       fail("Sync should fail");
544     } catch (IOException e) {
545       e.getMessage().startsWith("Cannot recover ");
546     }
547     DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
548     verify(namenode, never()).commitBlockSynchronization(
549         any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
550         anyBoolean(), any(DatanodeID[].class), any(String[].class));
551   }
552 
553   /**
554    * BlockRecoveryFI_11. a replica's recovery id does not match new GS.
555    *
556    * @throws IOException in case of an error
557    */
558   @Test
testNotMatchedReplicaID()559   public void testNotMatchedReplicaID() throws IOException {
560     if(LOG.isDebugEnabled()) {
561       LOG.debug("Running " + GenericTestUtils.getMethodName());
562     }
563     ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
564         StorageType.DEFAULT, block, false).getReplica();
565     ReplicaOutputStreams streams = null;
566     try {
567       streams = replicaInfo.createStreams(true,
568           DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
569       streams.getChecksumOut().write('a');
570       dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
571       try {
572         dn.syncBlock(rBlock, initBlockRecords(dn));
573         fail("Sync should fail");
574       } catch (IOException e) {
575         e.getMessage().startsWith("Cannot recover ");
576       }
577       DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
578       verify(namenode, never()).commitBlockSynchronization(
579           any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
580           anyBoolean(), any(DatanodeID[].class), any(String[].class));
581     } finally {
582       streams.close();
583     }
584   }
585 
586   /**
587    * Test to verify the race between finalizeBlock and Lease recovery
588    *
589    * @throws Exception
590    */
591   @Test(timeout = 20000)
testRaceBetweenReplicaRecoveryAndFinalizeBlock()592   public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception {
593     tearDown();// Stop the Mocked DN started in startup()
594 
595     Configuration conf = new HdfsConfiguration();
596     conf.set(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, "1000");
597     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
598         .numDataNodes(1).build();
599     try {
600       cluster.waitClusterUp();
601       DistributedFileSystem fs = cluster.getFileSystem();
602       Path path = new Path("/test");
603       FSDataOutputStream out = fs.create(path);
604       out.writeBytes("data");
605       out.hsync();
606 
607       List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
608       final LocatedBlock block = blocks.get(0);
609       final DataNode dataNode = cluster.getDataNodes().get(0);
610 
611       final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
612       Thread recoveryThread = new Thread() {
613         @Override
614         public void run() {
615           try {
616             DatanodeInfo[] locations = block.getLocations();
617             final RecoveringBlock recoveringBlock = new RecoveringBlock(
618                 block.getBlock(), locations, block.getBlock()
619                     .getGenerationStamp() + 1);
620             synchronized (dataNode.data) {
621               Thread.sleep(2000);
622               dataNode.initReplicaRecovery(recoveringBlock);
623             }
624           } catch (Exception e) {
625             recoveryInitResult.set(false);
626           }
627         }
628       };
629       recoveryThread.start();
630       try {
631         out.close();
632       } catch (IOException e) {
633         Assert.assertTrue("Writing should fail",
634             e.getMessage().contains("are bad. Aborting..."));
635       } finally {
636         recoveryThread.join();
637       }
638       Assert.assertTrue("Recovery should be initiated successfully",
639           recoveryInitResult.get());
640 
641       dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
642           .getGenerationStamp() + 1, block.getBlock().getBlockId(),
643           block.getBlockSize());
644     } finally {
645       if (null != cluster) {
646         cluster.shutdown();
647         cluster = null;
648       }
649     }
650   }
651 }
652