1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.datanode;
19 
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNotEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assume.assumeTrue;
26 
27 import java.io.File;
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.Socket;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.TimeoutException;
35 
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.FileUtil;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hdfs.BlockReader;
41 import org.apache.hadoop.hdfs.BlockReaderFactory;
42 import org.apache.hadoop.hdfs.ClientContext;
43 import org.apache.hadoop.hdfs.DFSClient;
44 import org.apache.hadoop.hdfs.DFSConfigKeys;
45 import org.apache.hadoop.hdfs.DFSTestUtil;
46 import org.apache.hadoop.hdfs.HdfsConfiguration;
47 import org.apache.hadoop.hdfs.MiniDFSCluster;
48 import org.apache.hadoop.hdfs.RemotePeerFactory;
49 import org.apache.hadoop.hdfs.net.Peer;
50 import org.apache.hadoop.hdfs.net.TcpPeerServer;
51 import org.apache.hadoop.hdfs.protocol.Block;
52 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
53 import org.apache.hadoop.hdfs.protocol.DatanodeID;
54 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
55 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
56 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
57 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
58 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
59 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
60 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
61 import org.apache.hadoop.hdfs.server.common.Storage;
62 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
63 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
64 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
65 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
66 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
67 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
68 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
69 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
70 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
71 import org.apache.hadoop.io.IOUtils;
72 import org.apache.hadoop.net.NetUtils;
73 import org.apache.hadoop.security.token.Token;
74 import org.junit.After;
75 import org.junit.Before;
76 import org.junit.Test;
77 
78 /**
79  * Fine-grain testing of block files and locations after volume failure.
80  */
81 public class TestDataNodeVolumeFailure {
82   final private int block_size = 512;
83   MiniDFSCluster cluster = null;
84   private Configuration conf;
85   final int dn_num = 2;
86   final int blocks_num = 30;
87   final short repl=2;
88   File dataDir = null;
89   File data_fail = null;
90   File failedDir = null;
91   private FileSystem fs;
92 
93   // mapping blocks to Meta files(physical files) and locs(NameNode locations)
94   private class BlockLocs {
95     public int num_files = 0;
96     public int num_locs = 0;
97   }
98   // block id to BlockLocs
99   final Map<String, BlockLocs> block_map = new HashMap<String, BlockLocs> ();
100 
101   @Before
setUp()102   public void setUp() throws Exception {
103     // bring up a cluster of 2
104     conf = new HdfsConfiguration();
105     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size);
106     // Allow a single volume failure (there are two volumes)
107     conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
108     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
109     cluster.waitActive();
110     fs = cluster.getFileSystem();
111     dataDir = new File(cluster.getDataDirectory());
112   }
113 
114   @After
tearDown()115   public void tearDown() throws Exception {
116     if(data_fail != null) {
117       FileUtil.setWritable(data_fail, true);
118     }
119     if(failedDir != null) {
120       FileUtil.setWritable(failedDir, true);
121     }
122     if(cluster != null) {
123       cluster.shutdown();
124     }
125   }
126 
127   /*
128    * Verify the number of blocks and files are correct after volume failure,
129    * and that we can replicate to both datanodes even after a single volume
130    * failure if the configuration parameter allows this.
131    */
132   @Test
testVolumeFailure()133   public void testVolumeFailure() throws Exception {
134     System.out.println("Data dir: is " +  dataDir.getPath());
135 
136 
137     // Data dir structure is dataDir/data[1-4]/[current,tmp...]
138     // data1,2 is for datanode 1, data2,3 - datanode2
139     String filename = "/test.txt";
140     Path filePath = new Path(filename);
141 
142     // we use only small number of blocks to avoid creating subdirs in the data dir..
143     int filesize = block_size*blocks_num;
144     DFSTestUtil.createFile(fs, filePath, filesize, repl, 1L);
145     DFSTestUtil.waitReplication(fs, filePath, repl);
146     System.out.println("file " + filename + "(size " +
147         filesize + ") is created and replicated");
148 
149     // fail the volume
150     // delete/make non-writable one of the directories (failed volume)
151     data_fail = new File(dataDir, "data3");
152     failedDir = MiniDFSCluster.getFinalizedDir(dataDir,
153         cluster.getNamesystem().getBlockPoolId());
154     if (failedDir.exists() &&
155         //!FileUtil.fullyDelete(failedDir)
156         !deteteBlocks(failedDir)
157         ) {
158       throw new IOException("Could not delete hdfs directory '" + failedDir + "'");
159     }
160     data_fail.setReadOnly();
161     failedDir.setReadOnly();
162     System.out.println("Deleteing " + failedDir.getPath() + "; exist=" + failedDir.exists());
163 
164     // access all the blocks on the "failed" DataNode,
165     // we need to make sure that the "failed" volume is being accessed -
166     // and that will cause failure, blocks removal, "emergency" block report
167     triggerFailure(filename, filesize);
168 
169     // make sure a block report is sent
170     DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
171     String bpid = cluster.getNamesystem().getBlockPoolId();
172     DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
173 
174     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
175         dn.getFSDataset().getBlockReports(bpid);
176 
177     // Send block report
178     StorageBlockReport[] reports =
179         new StorageBlockReport[perVolumeBlockLists.size()];
180 
181     int reportIndex = 0;
182     for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
183         DatanodeStorage dnStorage = kvPair.getKey();
184         BlockListAsLongs blockList = kvPair.getValue();
185         reports[reportIndex++] =
186             new StorageBlockReport(dnStorage, blockList);
187     }
188 
189     cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
190 
191     // verify number of blocks and files...
192     verify(filename, filesize);
193 
194     // create another file (with one volume failed).
195     System.out.println("creating file test1.txt");
196     Path fileName1 = new Path("/test1.txt");
197     DFSTestUtil.createFile(fs, fileName1, filesize, repl, 1L);
198 
199     // should be able to replicate to both nodes (2 DN, repl=2)
200     DFSTestUtil.waitReplication(fs, fileName1, repl);
201     System.out.println("file " + fileName1.getName() +
202         " is created and replicated");
203   }
204 
205   /**
206    * Test that DataStorage and BlockPoolSliceStorage remove the failed volume
207    * after failure.
208    */
209   @Test(timeout=150000)
testFailedVolumeBeingRemovedFromDataNode()210   public void testFailedVolumeBeingRemovedFromDataNode()
211       throws InterruptedException, IOException, TimeoutException {
212     Path file1 = new Path("/test1");
213     DFSTestUtil.createFile(fs, file1, 1024, (short) 2, 1L);
214     DFSTestUtil.waitReplication(fs, file1, (short) 2);
215 
216     File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
217     DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
218     DataNode dn0 = cluster.getDataNodes().get(0);
219     long lastDiskErrorCheck = dn0.getLastDiskErrorCheck();
220     dn0.checkDiskErrorAsync();
221     // Wait checkDiskError thread finish to discover volume failure.
222     while (dn0.getLastDiskErrorCheck() == lastDiskErrorCheck) {
223       Thread.sleep(100);
224     }
225 
226     // Verify dn0Vol1 has been completely removed from DN0.
227     // 1. dn0Vol1 is removed from DataStorage.
228     DataStorage storage = dn0.getStorage();
229     assertEquals(1, storage.getNumStorageDirs());
230     for (int i = 0; i < storage.getNumStorageDirs(); i++) {
231       Storage.StorageDirectory sd = storage.getStorageDir(i);
232       assertFalse(sd.getRoot().getAbsolutePath().startsWith(
233           dn0Vol1.getAbsolutePath()
234       ));
235     }
236     final String bpid = cluster.getNamesystem().getBlockPoolId();
237     BlockPoolSliceStorage bpsStorage = storage.getBPStorage(bpid);
238     assertEquals(1, bpsStorage.getNumStorageDirs());
239     for (int i = 0; i < bpsStorage.getNumStorageDirs(); i++) {
240       Storage.StorageDirectory sd = bpsStorage.getStorageDir(i);
241       assertFalse(sd.getRoot().getAbsolutePath().startsWith(
242           dn0Vol1.getAbsolutePath()
243       ));
244     }
245 
246     // 2. dn0Vol1 is removed from FsDataset
247     FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset();
248     for (FsVolumeSpi volume : data.getVolumes()) {
249       assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(),
250           dn0Vol1.getAbsoluteFile());
251     }
252 
253     // 3. all blocks on dn0Vol1 have been removed.
254     for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) {
255       assertNotNull(replica.getVolume());
256       assertNotEquals(
257           new File(replica.getVolume().getBasePath()).getAbsoluteFile(),
258           dn0Vol1.getAbsoluteFile());
259     }
260 
261     // 4. dn0Vol1 is not in DN0's configuration and dataDirs anymore.
262     String[] dataDirStrs =
263         dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
264     assertEquals(1, dataDirStrs.length);
265     assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath()));
266   }
267 
268   /**
269    * Test that there are under replication blocks after vol failures
270    */
271   @Test
testUnderReplicationAfterVolFailure()272   public void testUnderReplicationAfterVolFailure() throws Exception {
273     // This test relies on denying access to data volumes to simulate data volume
274     // failure.  This doesn't work on Windows, because an owner of an object
275     // always has the ability to read and change permissions on the object.
276     assumeTrue(!Path.WINDOWS);
277 
278     // Bring up one more datanode
279     cluster.startDataNodes(conf, 1, true, null, null);
280     cluster.waitActive();
281 
282     final BlockManager bm = cluster.getNamesystem().getBlockManager();
283 
284     Path file1 = new Path("/test1");
285     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L);
286     DFSTestUtil.waitReplication(fs, file1, (short)3);
287 
288     // Fail the first volume on both datanodes
289     File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
290     File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
291     DataNodeTestUtils.injectDataDirFailure(dn1Vol1, dn2Vol1);
292 
293     Path file2 = new Path("/test2");
294     DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L);
295     DFSTestUtil.waitReplication(fs, file2, (short)3);
296 
297     // underReplicatedBlocks are due to failed volumes
298     int underReplicatedBlocks =
299         BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(
300             cluster.getNamesystem(), bm);
301     assertTrue("There is no under replicated block after volume failure",
302         underReplicatedBlocks > 0);
303   }
304 
305   /**
306    * verifies two things:
307    *  1. number of locations of each block in the name node
308    *   matches number of actual files
309    *  2. block files + pending block equals to total number of blocks that a file has
310    *     including the replication (HDFS file has 30 blocks, repl=2 - total 60
311    * @param fn - file name
312    * @param fs - file size
313    * @throws IOException
314    */
verify(String fn, int fs)315   private void verify(String fn, int fs) throws IOException{
316     // now count how many physical blocks are there
317     int totalReal = countRealBlocks(block_map);
318     System.out.println("countRealBlocks counted " + totalReal + " blocks");
319 
320     // count how many blocks store in NN structures.
321     int totalNN = countNNBlocks(block_map, fn, fs);
322     System.out.println("countNNBlocks counted " + totalNN + " blocks");
323 
324     for(String bid : block_map.keySet()) {
325       BlockLocs bl = block_map.get(bid);
326       // System.out.println(bid + "->" + bl.num_files + "vs." + bl.num_locs);
327       // number of physical files (1 or 2) should be same as number of datanodes
328       // in the list of the block locations
329       assertEquals("Num files should match num locations",
330           bl.num_files, bl.num_locs);
331     }
332     assertEquals("Num physical blocks should match num stored in the NN",
333         totalReal, totalNN);
334 
335     // now check the number of under-replicated blocks
336     FSNamesystem fsn = cluster.getNamesystem();
337     // force update of all the metric counts by calling computeDatanodeWork
338     BlockManagerTestUtil.getComputedDatanodeWork(fsn.getBlockManager());
339     // get all the counts
340     long underRepl = fsn.getUnderReplicatedBlocks();
341     long pendRepl = fsn.getPendingReplicationBlocks();
342     long totalRepl = underRepl + pendRepl;
343     System.out.println("underreplicated after = "+ underRepl +
344         " and pending repl ="  + pendRepl + "; total underRepl = " + totalRepl);
345 
346     System.out.println("total blocks (real and replicating):" +
347         (totalReal + totalRepl) + " vs. all files blocks " + blocks_num*2);
348 
349     // together all the blocks should be equal to all real + all underreplicated
350     assertEquals("Incorrect total block count",
351         totalReal + totalRepl, blocks_num * repl);
352   }
353 
354   /**
355    * go to each block on the 2nd DataNode until it fails...
356    * @param path
357    * @param size
358    * @throws IOException
359    */
triggerFailure(String path, long size)360   private void triggerFailure(String path, long size) throws IOException {
361     NamenodeProtocols nn = cluster.getNameNodeRpc();
362     List<LocatedBlock> locatedBlocks =
363       nn.getBlockLocations(path, 0, size).getLocatedBlocks();
364 
365     for (LocatedBlock lb : locatedBlocks) {
366       DatanodeInfo dinfo = lb.getLocations()[1];
367       ExtendedBlock b = lb.getBlock();
368       try {
369         accessBlock(dinfo, lb);
370       } catch (IOException e) {
371         System.out.println("Failure triggered, on block: " + b.getBlockId() +
372             "; corresponding volume should be removed by now");
373         break;
374       }
375     }
376   }
377 
378   /**
379    * simulate failure delete all the block files
380    * @param dir
381    * @throws IOException
382    */
deteteBlocks(File dir)383   private boolean deteteBlocks(File dir) {
384     File [] fileList = dir.listFiles();
385     for(File f : fileList) {
386       if(f.getName().startsWith(Block.BLOCK_FILE_PREFIX)) {
387         if(!f.delete())
388           return false;
389 
390       }
391     }
392     return true;
393   }
394 
395   /**
396    * try to access a block on a data node. If fails - throws exception
397    * @param datanode
398    * @param lblock
399    * @throws IOException
400    */
accessBlock(DatanodeInfo datanode, LocatedBlock lblock)401   private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
402     throws IOException {
403     InetSocketAddress targetAddr = null;
404     ExtendedBlock block = lblock.getBlock();
405 
406     targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
407 
408     BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
409       setInetSocketAddress(targetAddr).
410       setBlock(block).
411       setFileName(BlockReaderFactory.getFileName(targetAddr,
412                     "test-blockpoolid", block.getBlockId())).
413       setBlockToken(lblock.getBlockToken()).
414       setStartOffset(0).
415       setLength(-1).
416       setVerifyChecksum(true).
417       setClientName("TestDataNodeVolumeFailure").
418       setDatanodeInfo(datanode).
419       setCachingStrategy(CachingStrategy.newDefaultStrategy()).
420       setClientCacheContext(ClientContext.getFromConf(conf)).
421       setConfiguration(conf).
422       setRemotePeerFactory(new RemotePeerFactory() {
423         @Override
424         public Peer newConnectedPeer(InetSocketAddress addr,
425             Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
426             throws IOException {
427           Peer peer = null;
428           Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
429           try {
430             sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
431             sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
432             peer = TcpPeerServer.peerFromSocket(sock);
433           } finally {
434             if (peer == null) {
435               IOUtils.closeSocket(sock);
436             }
437           }
438           return peer;
439         }
440       }).
441       build();
442     blockReader.close();
443   }
444 
445   /**
446    * Count datanodes that have copies of the blocks for a file
447    * put it into the map
448    * @param map
449    * @param path
450    * @param size
451    * @return
452    * @throws IOException
453    */
countNNBlocks(Map<String, BlockLocs> map, String path, long size)454   private int countNNBlocks(Map<String, BlockLocs> map, String path, long size)
455     throws IOException {
456     int total = 0;
457 
458     NamenodeProtocols nn = cluster.getNameNodeRpc();
459     List<LocatedBlock> locatedBlocks =
460       nn.getBlockLocations(path, 0, size).getLocatedBlocks();
461     //System.out.println("Number of blocks: " + locatedBlocks.size());
462 
463     for(LocatedBlock lb : locatedBlocks) {
464       String blockId = ""+lb.getBlock().getBlockId();
465       //System.out.print(blockId + ": ");
466       DatanodeInfo[] dn_locs = lb.getLocations();
467       BlockLocs bl = map.get(blockId);
468       if(bl == null) {
469         bl = new BlockLocs();
470       }
471       //System.out.print(dn_info.name+",");
472       total += dn_locs.length;
473       bl.num_locs += dn_locs.length;
474       map.put(blockId, bl);
475       //System.out.println();
476     }
477     return total;
478   }
479 
480   /**
481    *  look for real blocks
482    *  by counting *.meta files in all the storage dirs
483    * @param map
484    * @return
485    */
countRealBlocks(Map<String, BlockLocs> map)486   private int countRealBlocks(Map<String, BlockLocs> map) {
487     int total = 0;
488     final String bpid = cluster.getNamesystem().getBlockPoolId();
489     for(int i=0; i<dn_num; i++) {
490       for(int j=0; j<=1; j++) {
491         File storageDir = cluster.getInstanceStorageDir(i, j);
492         File dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
493         if(dir == null) {
494           System.out.println("dir is null for dn=" + i + " and data_dir=" + j);
495           continue;
496         }
497 
498         List<File> res = MiniDFSCluster.getAllBlockMetadataFiles(dir);
499         if(res == null) {
500           System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j);
501           continue;
502         }
503         //System.out.println("for dn" + i + "." + j + ": " + dir + "=" + res.length+ " files");
504 
505         //int ii = 0;
506         for(File f: res) {
507           String s = f.getName();
508           // cut off "blk_-" at the beginning and ".meta" at the end
509           assertNotNull("Block file name should not be null", s);
510           String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_"));
511           //System.out.println(ii++ + ". block " + s + "; id=" + bid);
512           BlockLocs val = map.get(bid);
513           if(val == null) {
514             val = new BlockLocs();
515           }
516           val.num_files ++; // one more file for the block
517           map.put(bid, val);
518 
519         }
520         //System.out.println("dir1="+dir.getPath() + "blocks=" + res.length);
521         //System.out.println("dir2="+dir2.getPath() + "blocks=" + res2.length);
522 
523         total += res.size();
524       }
525     }
526     return total;
527   }
528 }
529