1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.junit.Assert.assertEquals; 21 22 import java.io.IOException; 23 import java.net.InetSocketAddress; 24 import java.util.HashSet; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.Set; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.conf.Configuration; 32 import org.apache.hadoop.fs.CommonConfigurationKeys; 33 import org.apache.hadoop.fs.FSDataOutputStream; 34 import org.apache.hadoop.fs.FileSystem; 35 import org.apache.hadoop.fs.Path; 36 import org.apache.hadoop.hdfs.protocol.Block; 37 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; 38 import org.apache.hadoop.hdfs.protocol.ClientProtocol; 39 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 40 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 41 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; 42 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; 43 import org.apache.hadoop.util.Time; 44 import org.junit.Test; 45 46 47 /** 48 * This class tests the replication and injection of blocks of a DFS file for simulated storage. 49 */ 50 public class TestInjectionForSimulatedStorage { 51 private final int checksumSize = 16; 52 private final int blockSize = checksumSize*2; 53 private final int numBlocks = 4; 54 private final int filesize = blockSize*numBlocks; 55 private final int numDataNodes = 4; 56 private static final Log LOG = LogFactory.getLog( 57 "org.apache.hadoop.hdfs.TestInjectionForSimulatedStorage"); 58 59 writeFile(FileSystem fileSys, Path name, int repl)60 private void writeFile(FileSystem fileSys, Path name, int repl) 61 throws IOException { 62 // create and write a file that contains three blocks of data 63 FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf() 64 .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), 65 (short) repl, blockSize); 66 byte[] buffer = new byte[filesize]; 67 for (int i=0; i<buffer.length; i++) { 68 buffer[i] = '1'; 69 } 70 stm.write(buffer); 71 stm.close(); 72 } 73 74 // Waits for all of the blocks to have expected replication 75 76 // Waits for all of the blocks to have expected replication waitForBlockReplication(String filename, ClientProtocol namenode, int expected, long maxWaitSec)77 private void waitForBlockReplication(String filename, 78 ClientProtocol namenode, 79 int expected, long maxWaitSec) 80 throws IOException { 81 long start = Time.monotonicNow(); 82 83 //wait for all the blocks to be replicated; 84 LOG.info("Checking for block replication for " + filename); 85 86 LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE); 87 assertEquals(numBlocks, blocks.locatedBlockCount()); 88 89 for (int i = 0; i < numBlocks; ++i) { 90 LOG.info("Checking for block:" + (i+1)); 91 while (true) { // Loop to check for block i (usually when 0 is done all will be done 92 blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE); 93 assertEquals(numBlocks, blocks.locatedBlockCount()); 94 LocatedBlock block = blocks.get(i); 95 int actual = block.getLocations().length; 96 if ( actual == expected ) { 97 LOG.info("Got enough replicas for " + (i+1) + "th block " + block.getBlock() + 98 ", got " + actual + "."); 99 break; 100 } 101 LOG.info("Not enough replicas for " + (i+1) + "th block " + block.getBlock() + 102 " yet. Expecting " + expected + ", got " + 103 actual + "."); 104 105 if (maxWaitSec > 0 && 106 (Time.monotonicNow() - start) > (maxWaitSec * 1000)) { 107 throw new IOException("Timedout while waiting for all blocks to " + 108 " be replicated for " + filename); 109 } 110 111 try { 112 Thread.sleep(500); 113 } catch (InterruptedException ignored) {} 114 } 115 } 116 } 117 118 119 120 /* This test makes sure that NameNode retries all the available blocks 121 * for under replicated blocks. This test uses simulated storage and one 122 * of its features to inject blocks, 123 * 124 * It creates a file with several blocks and replication of 4. 125 * The cluster is then shut down - NN retains its state but the DNs are 126 * all simulated and hence loose their blocks. 127 * The blocks are then injected in one of the DNs. The expected behaviour is 128 * that the NN will arrange for themissing replica will be copied from a valid source. 129 */ 130 @Test testInjection()131 public void testInjection() throws IOException { 132 133 MiniDFSCluster cluster = null; 134 135 String testFile = "/replication-test-file"; 136 Path testPath = new Path(testFile); 137 138 byte buffer[] = new byte[1024]; 139 for (int i=0; i<buffer.length; i++) { 140 buffer[i] = '1'; 141 } 142 143 try { 144 Configuration conf = new HdfsConfiguration(); 145 conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(numDataNodes)); 146 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, checksumSize); 147 SimulatedFSDataset.setFactory(conf); 148 //first time format 149 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); 150 cluster.waitActive(); 151 String bpid = cluster.getNamesystem().getBlockPoolId(); 152 DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", 153 cluster.getNameNodePort()), 154 conf); 155 156 writeFile(cluster.getFileSystem(), testPath, numDataNodes); 157 waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, 20); 158 List<Map<DatanodeStorage, BlockListAsLongs>> blocksList = cluster.getAllBlockReports(bpid); 159 160 cluster.shutdown(); 161 cluster = null; 162 163 /* Start the MiniDFSCluster with more datanodes since once a writeBlock 164 * to a datanode node fails, same block can not be written to it 165 * immediately. In our case some replication attempts will fail. 166 */ 167 168 LOG.info("Restarting minicluster"); 169 conf = new HdfsConfiguration(); 170 SimulatedFSDataset.setFactory(conf); 171 conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); 172 173 cluster = new MiniDFSCluster.Builder(conf) 174 .numDataNodes(numDataNodes * 2) 175 .format(false) 176 .build(); 177 cluster.waitActive(); 178 Set<Block> uniqueBlocks = new HashSet<Block>(); 179 for(Map<DatanodeStorage, BlockListAsLongs> map : blocksList) { 180 for(BlockListAsLongs blockList : map.values()) { 181 for(Block b : blockList) { 182 uniqueBlocks.add(new Block(b)); 183 } 184 } 185 } 186 // Insert all the blocks in the first data node 187 188 LOG.info("Inserting " + uniqueBlocks.size() + " blocks"); 189 cluster.injectBlocks(0, uniqueBlocks, null); 190 191 dfsClient = new DFSClient(new InetSocketAddress("localhost", 192 cluster.getNameNodePort()), 193 conf); 194 195 waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, -1); 196 197 } finally { 198 if (cluster != null) { 199 cluster.shutdown(); 200 } 201 } 202 } 203 } 204