1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.junit.Assert.assertEquals;
21 
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.CommonConfigurationKeys;
33 import org.apache.hadoop.fs.FSDataOutputStream;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hdfs.protocol.Block;
37 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
38 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
39 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
40 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
41 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
42 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
43 import org.apache.hadoop.util.Time;
44 import org.junit.Test;
45 
46 
47 /**
48  * This class tests the replication and injection of blocks of a DFS file for simulated storage.
49  */
50 public class TestInjectionForSimulatedStorage {
51   private final int checksumSize = 16;
52   private final int blockSize = checksumSize*2;
53   private final int numBlocks = 4;
54   private final int filesize = blockSize*numBlocks;
55   private final int numDataNodes = 4;
56   private static final Log LOG = LogFactory.getLog(
57       "org.apache.hadoop.hdfs.TestInjectionForSimulatedStorage");
58 
59 
writeFile(FileSystem fileSys, Path name, int repl)60   private void writeFile(FileSystem fileSys, Path name, int repl)
61                                                 throws IOException {
62     // create and write a file that contains three blocks of data
63     FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
64         .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
65         (short) repl, blockSize);
66     byte[] buffer = new byte[filesize];
67     for (int i=0; i<buffer.length; i++) {
68       buffer[i] = '1';
69     }
70     stm.write(buffer);
71     stm.close();
72   }
73 
74   // Waits for all of the blocks to have expected replication
75 
76   // Waits for all of the blocks to have expected replication
waitForBlockReplication(String filename, ClientProtocol namenode, int expected, long maxWaitSec)77   private void waitForBlockReplication(String filename,
78                                        ClientProtocol namenode,
79                                        int expected, long maxWaitSec)
80                                        throws IOException {
81     long start = Time.monotonicNow();
82 
83     //wait for all the blocks to be replicated;
84     LOG.info("Checking for block replication for " + filename);
85 
86     LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE);
87     assertEquals(numBlocks, blocks.locatedBlockCount());
88 
89     for (int i = 0; i < numBlocks; ++i) {
90       LOG.info("Checking for block:" + (i+1));
91       while (true) { // Loop to check for block i (usually when 0 is done all will be done
92         blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE);
93         assertEquals(numBlocks, blocks.locatedBlockCount());
94         LocatedBlock block = blocks.get(i);
95         int actual = block.getLocations().length;
96         if ( actual == expected ) {
97           LOG.info("Got enough replicas for " + (i+1) + "th block " + block.getBlock() +
98               ", got " + actual + ".");
99           break;
100         }
101         LOG.info("Not enough replicas for " + (i+1) + "th block " + block.getBlock() +
102                                " yet. Expecting " + expected + ", got " +
103                                actual + ".");
104 
105         if (maxWaitSec > 0 &&
106             (Time.monotonicNow() - start) > (maxWaitSec * 1000)) {
107           throw new IOException("Timedout while waiting for all blocks to " +
108                                 " be replicated for " + filename);
109         }
110 
111         try {
112           Thread.sleep(500);
113         } catch (InterruptedException ignored) {}
114       }
115     }
116   }
117 
118 
119 
120   /* This test makes sure that NameNode retries all the available blocks
121    * for under replicated blocks. This test uses simulated storage and one
122    * of its features to inject blocks,
123    *
124    * It creates a file with several blocks and replication of 4.
125    * The cluster is then shut down - NN retains its state but the DNs are
126    * all simulated and hence loose their blocks.
127    * The blocks are then injected in one of the DNs. The  expected behaviour is
128    * that the NN will arrange for themissing replica will be copied from a valid source.
129    */
130   @Test
testInjection()131   public void testInjection() throws IOException {
132 
133     MiniDFSCluster cluster = null;
134 
135     String testFile = "/replication-test-file";
136     Path testPath = new Path(testFile);
137 
138     byte buffer[] = new byte[1024];
139     for (int i=0; i<buffer.length; i++) {
140       buffer[i] = '1';
141     }
142 
143     try {
144       Configuration conf = new HdfsConfiguration();
145       conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(numDataNodes));
146       conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, checksumSize);
147       SimulatedFSDataset.setFactory(conf);
148       //first time format
149       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
150       cluster.waitActive();
151       String bpid = cluster.getNamesystem().getBlockPoolId();
152       DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
153                                             cluster.getNameNodePort()),
154                                             conf);
155 
156       writeFile(cluster.getFileSystem(), testPath, numDataNodes);
157       waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, 20);
158       List<Map<DatanodeStorage, BlockListAsLongs>> blocksList = cluster.getAllBlockReports(bpid);
159 
160       cluster.shutdown();
161       cluster = null;
162 
163       /* Start the MiniDFSCluster with more datanodes since once a writeBlock
164        * to a datanode node fails, same block can not be written to it
165        * immediately. In our case some replication attempts will fail.
166        */
167 
168       LOG.info("Restarting minicluster");
169       conf = new HdfsConfiguration();
170       SimulatedFSDataset.setFactory(conf);
171       conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
172 
173       cluster = new MiniDFSCluster.Builder(conf)
174                                   .numDataNodes(numDataNodes * 2)
175                                   .format(false)
176                                   .build();
177       cluster.waitActive();
178       Set<Block> uniqueBlocks = new HashSet<Block>();
179       for(Map<DatanodeStorage, BlockListAsLongs> map : blocksList) {
180         for(BlockListAsLongs blockList : map.values()) {
181           for(Block b : blockList) {
182             uniqueBlocks.add(new Block(b));
183           }
184         }
185       }
186       // Insert all the blocks in the first data node
187 
188       LOG.info("Inserting " + uniqueBlocks.size() + " blocks");
189       cluster.injectBlocks(0, uniqueBlocks, null);
190 
191       dfsClient = new DFSClient(new InetSocketAddress("localhost",
192                                   cluster.getNameNodePort()),
193                                   conf);
194 
195       waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, -1);
196 
197     } finally {
198       if (cluster != null) {
199         cluster.shutdown();
200       }
201     }
202   }
203 }
204