1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.datanode; 19 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertFalse; 22 import static org.junit.Assert.assertNotEquals; 23 import static org.junit.Assert.assertNotNull; 24 import static org.junit.Assert.assertTrue; 25 import static org.junit.Assume.assumeTrue; 26 27 import java.io.File; 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.Socket; 31 import java.util.HashMap; 32 import java.util.List; 33 import java.util.Map; 34 import java.util.concurrent.TimeoutException; 35 36 import org.apache.hadoop.conf.Configuration; 37 import org.apache.hadoop.fs.FileSystem; 38 import org.apache.hadoop.fs.FileUtil; 39 import org.apache.hadoop.fs.Path; 40 import org.apache.hadoop.hdfs.BlockReader; 41 import org.apache.hadoop.hdfs.BlockReaderFactory; 42 import org.apache.hadoop.hdfs.ClientContext; 43 import org.apache.hadoop.hdfs.DFSClient; 44 import org.apache.hadoop.hdfs.DFSConfigKeys; 45 import org.apache.hadoop.hdfs.DFSTestUtil; 46 import org.apache.hadoop.hdfs.HdfsConfiguration; 47 import org.apache.hadoop.hdfs.MiniDFSCluster; 48 import org.apache.hadoop.hdfs.RemotePeerFactory; 49 import org.apache.hadoop.hdfs.net.Peer; 50 import org.apache.hadoop.hdfs.net.TcpPeerServer; 51 import org.apache.hadoop.hdfs.protocol.Block; 52 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; 53 import org.apache.hadoop.hdfs.protocol.DatanodeID; 54 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 55 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 56 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 57 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 58 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; 59 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; 60 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; 61 import org.apache.hadoop.hdfs.server.common.Storage; 62 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 63 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; 64 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; 65 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; 66 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; 67 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; 68 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; 69 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; 70 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; 71 import org.apache.hadoop.io.IOUtils; 72 import org.apache.hadoop.net.NetUtils; 73 import org.apache.hadoop.security.token.Token; 74 import org.junit.After; 75 import org.junit.Before; 76 import org.junit.Test; 77 78 /** 79 * Fine-grain testing of block files and locations after volume failure. 80 */ 81 public class TestDataNodeVolumeFailure { 82 final private int block_size = 512; 83 MiniDFSCluster cluster = null; 84 private Configuration conf; 85 final int dn_num = 2; 86 final int blocks_num = 30; 87 final short repl=2; 88 File dataDir = null; 89 File data_fail = null; 90 File failedDir = null; 91 private FileSystem fs; 92 93 // mapping blocks to Meta files(physical files) and locs(NameNode locations) 94 private class BlockLocs { 95 public int num_files = 0; 96 public int num_locs = 0; 97 } 98 // block id to BlockLocs 99 final Map<String, BlockLocs> block_map = new HashMap<String, BlockLocs> (); 100 101 @Before setUp()102 public void setUp() throws Exception { 103 // bring up a cluster of 2 104 conf = new HdfsConfiguration(); 105 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size); 106 // Allow a single volume failure (there are two volumes) 107 conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); 108 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build(); 109 cluster.waitActive(); 110 fs = cluster.getFileSystem(); 111 dataDir = new File(cluster.getDataDirectory()); 112 } 113 114 @After tearDown()115 public void tearDown() throws Exception { 116 if(data_fail != null) { 117 FileUtil.setWritable(data_fail, true); 118 } 119 if(failedDir != null) { 120 FileUtil.setWritable(failedDir, true); 121 } 122 if(cluster != null) { 123 cluster.shutdown(); 124 } 125 } 126 127 /* 128 * Verify the number of blocks and files are correct after volume failure, 129 * and that we can replicate to both datanodes even after a single volume 130 * failure if the configuration parameter allows this. 131 */ 132 @Test testVolumeFailure()133 public void testVolumeFailure() throws Exception { 134 System.out.println("Data dir: is " + dataDir.getPath()); 135 136 137 // Data dir structure is dataDir/data[1-4]/[current,tmp...] 138 // data1,2 is for datanode 1, data2,3 - datanode2 139 String filename = "/test.txt"; 140 Path filePath = new Path(filename); 141 142 // we use only small number of blocks to avoid creating subdirs in the data dir.. 143 int filesize = block_size*blocks_num; 144 DFSTestUtil.createFile(fs, filePath, filesize, repl, 1L); 145 DFSTestUtil.waitReplication(fs, filePath, repl); 146 System.out.println("file " + filename + "(size " + 147 filesize + ") is created and replicated"); 148 149 // fail the volume 150 // delete/make non-writable one of the directories (failed volume) 151 data_fail = new File(dataDir, "data3"); 152 failedDir = MiniDFSCluster.getFinalizedDir(dataDir, 153 cluster.getNamesystem().getBlockPoolId()); 154 if (failedDir.exists() && 155 //!FileUtil.fullyDelete(failedDir) 156 !deteteBlocks(failedDir) 157 ) { 158 throw new IOException("Could not delete hdfs directory '" + failedDir + "'"); 159 } 160 data_fail.setReadOnly(); 161 failedDir.setReadOnly(); 162 System.out.println("Deleteing " + failedDir.getPath() + "; exist=" + failedDir.exists()); 163 164 // access all the blocks on the "failed" DataNode, 165 // we need to make sure that the "failed" volume is being accessed - 166 // and that will cause failure, blocks removal, "emergency" block report 167 triggerFailure(filename, filesize); 168 169 // make sure a block report is sent 170 DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3 171 String bpid = cluster.getNamesystem().getBlockPoolId(); 172 DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid); 173 174 Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = 175 dn.getFSDataset().getBlockReports(bpid); 176 177 // Send block report 178 StorageBlockReport[] reports = 179 new StorageBlockReport[perVolumeBlockLists.size()]; 180 181 int reportIndex = 0; 182 for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) { 183 DatanodeStorage dnStorage = kvPair.getKey(); 184 BlockListAsLongs blockList = kvPair.getValue(); 185 reports[reportIndex++] = 186 new StorageBlockReport(dnStorage, blockList); 187 } 188 189 cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null); 190 191 // verify number of blocks and files... 192 verify(filename, filesize); 193 194 // create another file (with one volume failed). 195 System.out.println("creating file test1.txt"); 196 Path fileName1 = new Path("/test1.txt"); 197 DFSTestUtil.createFile(fs, fileName1, filesize, repl, 1L); 198 199 // should be able to replicate to both nodes (2 DN, repl=2) 200 DFSTestUtil.waitReplication(fs, fileName1, repl); 201 System.out.println("file " + fileName1.getName() + 202 " is created and replicated"); 203 } 204 205 /** 206 * Test that DataStorage and BlockPoolSliceStorage remove the failed volume 207 * after failure. 208 */ 209 @Test(timeout=150000) testFailedVolumeBeingRemovedFromDataNode()210 public void testFailedVolumeBeingRemovedFromDataNode() 211 throws InterruptedException, IOException, TimeoutException { 212 Path file1 = new Path("/test1"); 213 DFSTestUtil.createFile(fs, file1, 1024, (short) 2, 1L); 214 DFSTestUtil.waitReplication(fs, file1, (short) 2); 215 216 File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); 217 DataNodeTestUtils.injectDataDirFailure(dn0Vol1); 218 DataNode dn0 = cluster.getDataNodes().get(0); 219 long lastDiskErrorCheck = dn0.getLastDiskErrorCheck(); 220 dn0.checkDiskErrorAsync(); 221 // Wait checkDiskError thread finish to discover volume failure. 222 while (dn0.getLastDiskErrorCheck() == lastDiskErrorCheck) { 223 Thread.sleep(100); 224 } 225 226 // Verify dn0Vol1 has been completely removed from DN0. 227 // 1. dn0Vol1 is removed from DataStorage. 228 DataStorage storage = dn0.getStorage(); 229 assertEquals(1, storage.getNumStorageDirs()); 230 for (int i = 0; i < storage.getNumStorageDirs(); i++) { 231 Storage.StorageDirectory sd = storage.getStorageDir(i); 232 assertFalse(sd.getRoot().getAbsolutePath().startsWith( 233 dn0Vol1.getAbsolutePath() 234 )); 235 } 236 final String bpid = cluster.getNamesystem().getBlockPoolId(); 237 BlockPoolSliceStorage bpsStorage = storage.getBPStorage(bpid); 238 assertEquals(1, bpsStorage.getNumStorageDirs()); 239 for (int i = 0; i < bpsStorage.getNumStorageDirs(); i++) { 240 Storage.StorageDirectory sd = bpsStorage.getStorageDir(i); 241 assertFalse(sd.getRoot().getAbsolutePath().startsWith( 242 dn0Vol1.getAbsolutePath() 243 )); 244 } 245 246 // 2. dn0Vol1 is removed from FsDataset 247 FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset(); 248 for (FsVolumeSpi volume : data.getVolumes()) { 249 assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(), 250 dn0Vol1.getAbsoluteFile()); 251 } 252 253 // 3. all blocks on dn0Vol1 have been removed. 254 for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) { 255 assertNotNull(replica.getVolume()); 256 assertNotEquals( 257 new File(replica.getVolume().getBasePath()).getAbsoluteFile(), 258 dn0Vol1.getAbsoluteFile()); 259 } 260 261 // 4. dn0Vol1 is not in DN0's configuration and dataDirs anymore. 262 String[] dataDirStrs = 263 dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); 264 assertEquals(1, dataDirStrs.length); 265 assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath())); 266 } 267 268 /** 269 * Test that there are under replication blocks after vol failures 270 */ 271 @Test testUnderReplicationAfterVolFailure()272 public void testUnderReplicationAfterVolFailure() throws Exception { 273 // This test relies on denying access to data volumes to simulate data volume 274 // failure. This doesn't work on Windows, because an owner of an object 275 // always has the ability to read and change permissions on the object. 276 assumeTrue(!Path.WINDOWS); 277 278 // Bring up one more datanode 279 cluster.startDataNodes(conf, 1, true, null, null); 280 cluster.waitActive(); 281 282 final BlockManager bm = cluster.getNamesystem().getBlockManager(); 283 284 Path file1 = new Path("/test1"); 285 DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L); 286 DFSTestUtil.waitReplication(fs, file1, (short)3); 287 288 // Fail the first volume on both datanodes 289 File dn1Vol1 = new File(dataDir, "data"+(2*0+1)); 290 File dn2Vol1 = new File(dataDir, "data"+(2*1+1)); 291 DataNodeTestUtils.injectDataDirFailure(dn1Vol1, dn2Vol1); 292 293 Path file2 = new Path("/test2"); 294 DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L); 295 DFSTestUtil.waitReplication(fs, file2, (short)3); 296 297 // underReplicatedBlocks are due to failed volumes 298 int underReplicatedBlocks = 299 BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount( 300 cluster.getNamesystem(), bm); 301 assertTrue("There is no under replicated block after volume failure", 302 underReplicatedBlocks > 0); 303 } 304 305 /** 306 * verifies two things: 307 * 1. number of locations of each block in the name node 308 * matches number of actual files 309 * 2. block files + pending block equals to total number of blocks that a file has 310 * including the replication (HDFS file has 30 blocks, repl=2 - total 60 311 * @param fn - file name 312 * @param fs - file size 313 * @throws IOException 314 */ verify(String fn, int fs)315 private void verify(String fn, int fs) throws IOException{ 316 // now count how many physical blocks are there 317 int totalReal = countRealBlocks(block_map); 318 System.out.println("countRealBlocks counted " + totalReal + " blocks"); 319 320 // count how many blocks store in NN structures. 321 int totalNN = countNNBlocks(block_map, fn, fs); 322 System.out.println("countNNBlocks counted " + totalNN + " blocks"); 323 324 for(String bid : block_map.keySet()) { 325 BlockLocs bl = block_map.get(bid); 326 // System.out.println(bid + "->" + bl.num_files + "vs." + bl.num_locs); 327 // number of physical files (1 or 2) should be same as number of datanodes 328 // in the list of the block locations 329 assertEquals("Num files should match num locations", 330 bl.num_files, bl.num_locs); 331 } 332 assertEquals("Num physical blocks should match num stored in the NN", 333 totalReal, totalNN); 334 335 // now check the number of under-replicated blocks 336 FSNamesystem fsn = cluster.getNamesystem(); 337 // force update of all the metric counts by calling computeDatanodeWork 338 BlockManagerTestUtil.getComputedDatanodeWork(fsn.getBlockManager()); 339 // get all the counts 340 long underRepl = fsn.getUnderReplicatedBlocks(); 341 long pendRepl = fsn.getPendingReplicationBlocks(); 342 long totalRepl = underRepl + pendRepl; 343 System.out.println("underreplicated after = "+ underRepl + 344 " and pending repl =" + pendRepl + "; total underRepl = " + totalRepl); 345 346 System.out.println("total blocks (real and replicating):" + 347 (totalReal + totalRepl) + " vs. all files blocks " + blocks_num*2); 348 349 // together all the blocks should be equal to all real + all underreplicated 350 assertEquals("Incorrect total block count", 351 totalReal + totalRepl, blocks_num * repl); 352 } 353 354 /** 355 * go to each block on the 2nd DataNode until it fails... 356 * @param path 357 * @param size 358 * @throws IOException 359 */ triggerFailure(String path, long size)360 private void triggerFailure(String path, long size) throws IOException { 361 NamenodeProtocols nn = cluster.getNameNodeRpc(); 362 List<LocatedBlock> locatedBlocks = 363 nn.getBlockLocations(path, 0, size).getLocatedBlocks(); 364 365 for (LocatedBlock lb : locatedBlocks) { 366 DatanodeInfo dinfo = lb.getLocations()[1]; 367 ExtendedBlock b = lb.getBlock(); 368 try { 369 accessBlock(dinfo, lb); 370 } catch (IOException e) { 371 System.out.println("Failure triggered, on block: " + b.getBlockId() + 372 "; corresponding volume should be removed by now"); 373 break; 374 } 375 } 376 } 377 378 /** 379 * simulate failure delete all the block files 380 * @param dir 381 * @throws IOException 382 */ deteteBlocks(File dir)383 private boolean deteteBlocks(File dir) { 384 File [] fileList = dir.listFiles(); 385 for(File f : fileList) { 386 if(f.getName().startsWith(Block.BLOCK_FILE_PREFIX)) { 387 if(!f.delete()) 388 return false; 389 390 } 391 } 392 return true; 393 } 394 395 /** 396 * try to access a block on a data node. If fails - throws exception 397 * @param datanode 398 * @param lblock 399 * @throws IOException 400 */ accessBlock(DatanodeInfo datanode, LocatedBlock lblock)401 private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) 402 throws IOException { 403 InetSocketAddress targetAddr = null; 404 ExtendedBlock block = lblock.getBlock(); 405 406 targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); 407 408 BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). 409 setInetSocketAddress(targetAddr). 410 setBlock(block). 411 setFileName(BlockReaderFactory.getFileName(targetAddr, 412 "test-blockpoolid", block.getBlockId())). 413 setBlockToken(lblock.getBlockToken()). 414 setStartOffset(0). 415 setLength(-1). 416 setVerifyChecksum(true). 417 setClientName("TestDataNodeVolumeFailure"). 418 setDatanodeInfo(datanode). 419 setCachingStrategy(CachingStrategy.newDefaultStrategy()). 420 setClientCacheContext(ClientContext.getFromConf(conf)). 421 setConfiguration(conf). 422 setRemotePeerFactory(new RemotePeerFactory() { 423 @Override 424 public Peer newConnectedPeer(InetSocketAddress addr, 425 Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) 426 throws IOException { 427 Peer peer = null; 428 Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); 429 try { 430 sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); 431 sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); 432 peer = TcpPeerServer.peerFromSocket(sock); 433 } finally { 434 if (peer == null) { 435 IOUtils.closeSocket(sock); 436 } 437 } 438 return peer; 439 } 440 }). 441 build(); 442 blockReader.close(); 443 } 444 445 /** 446 * Count datanodes that have copies of the blocks for a file 447 * put it into the map 448 * @param map 449 * @param path 450 * @param size 451 * @return 452 * @throws IOException 453 */ countNNBlocks(Map<String, BlockLocs> map, String path, long size)454 private int countNNBlocks(Map<String, BlockLocs> map, String path, long size) 455 throws IOException { 456 int total = 0; 457 458 NamenodeProtocols nn = cluster.getNameNodeRpc(); 459 List<LocatedBlock> locatedBlocks = 460 nn.getBlockLocations(path, 0, size).getLocatedBlocks(); 461 //System.out.println("Number of blocks: " + locatedBlocks.size()); 462 463 for(LocatedBlock lb : locatedBlocks) { 464 String blockId = ""+lb.getBlock().getBlockId(); 465 //System.out.print(blockId + ": "); 466 DatanodeInfo[] dn_locs = lb.getLocations(); 467 BlockLocs bl = map.get(blockId); 468 if(bl == null) { 469 bl = new BlockLocs(); 470 } 471 //System.out.print(dn_info.name+","); 472 total += dn_locs.length; 473 bl.num_locs += dn_locs.length; 474 map.put(blockId, bl); 475 //System.out.println(); 476 } 477 return total; 478 } 479 480 /** 481 * look for real blocks 482 * by counting *.meta files in all the storage dirs 483 * @param map 484 * @return 485 */ countRealBlocks(Map<String, BlockLocs> map)486 private int countRealBlocks(Map<String, BlockLocs> map) { 487 int total = 0; 488 final String bpid = cluster.getNamesystem().getBlockPoolId(); 489 for(int i=0; i<dn_num; i++) { 490 for(int j=0; j<=1; j++) { 491 File storageDir = cluster.getInstanceStorageDir(i, j); 492 File dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); 493 if(dir == null) { 494 System.out.println("dir is null for dn=" + i + " and data_dir=" + j); 495 continue; 496 } 497 498 List<File> res = MiniDFSCluster.getAllBlockMetadataFiles(dir); 499 if(res == null) { 500 System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j); 501 continue; 502 } 503 //System.out.println("for dn" + i + "." + j + ": " + dir + "=" + res.length+ " files"); 504 505 //int ii = 0; 506 for(File f: res) { 507 String s = f.getName(); 508 // cut off "blk_-" at the beginning and ".meta" at the end 509 assertNotNull("Block file name should not be null", s); 510 String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_")); 511 //System.out.println(ii++ + ". block " + s + "; id=" + bid); 512 BlockLocs val = map.get(bid); 513 if(val == null) { 514 val = new BlockLocs(); 515 } 516 val.num_files ++; // one more file for the block 517 map.put(bid, val); 518 519 } 520 //System.out.println("dir1="+dir.getPath() + "blocks=" + res.length); 521 //System.out.println("dir2="+dir2.getPath() + "blocks=" + res2.length); 522 523 total += res.size(); 524 } 525 } 526 return total; 527 } 528 } 529