1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.junit.Assert.assertNotNull; 21 import static org.junit.Assert.assertTrue; 22 import static org.junit.Assert.fail; 23 import static org.mockito.Matchers.anyLong; 24 import static org.mockito.Matchers.anyObject; 25 import static org.mockito.Matchers.anyString; 26 import static org.mockito.Mockito.doAnswer; 27 import static org.mockito.Mockito.spy; 28 29 import java.io.IOException; 30 import java.io.OutputStream; 31 import java.util.List; 32 import java.util.concurrent.atomic.AtomicReference; 33 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 import org.apache.commons.logging.impl.Log4JLogger; 37 import org.apache.hadoop.conf.Configuration; 38 import org.apache.hadoop.fs.FSDataOutputStream; 39 import org.apache.hadoop.fs.FileSystem; 40 import org.apache.hadoop.fs.Path; 41 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 42 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 43 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 44 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 45 import org.apache.hadoop.hdfs.server.datanode.DataNode; 46 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; 47 import org.apache.hadoop.hdfs.server.namenode.FSDirectory; 48 import org.apache.hadoop.hdfs.server.namenode.INodeFile; 49 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; 50 import org.apache.hadoop.test.GenericTestUtils; 51 import org.apache.log4j.Level; 52 import org.junit.Before; 53 import org.junit.Test; 54 55 /* File Append tests for HDFS-200 & HDFS-142, specifically focused on: 56 * using append()/sync() to recover block information 57 */ 58 public class TestFileAppend4 { 59 static final Log LOG = LogFactory.getLog(TestFileAppend4.class); 60 static final long BLOCK_SIZE = 1024; 61 static final long BBW_SIZE = 500; // don't align on bytes/checksum 62 63 static final Object [] NO_ARGS = new Object []{}; 64 65 Configuration conf; 66 MiniDFSCluster cluster; 67 Path file1; 68 FSDataOutputStream stm; 69 final boolean simulatedStorage = false; 70 71 { 72 DFSTestUtil.setNameNodeLogLevel(Level.ALL); GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL)73 GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL)74 GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); 75 } 76 77 @Before setUp()78 public void setUp() throws Exception { 79 this.conf = new Configuration(); 80 if (simulatedStorage) { 81 SimulatedFSDataset.setFactory(conf); 82 } 83 84 // lower heartbeat interval for fast recognition of DN death 85 conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 86 1000); 87 conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); 88 conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000); 89 // handle under-replicated blocks quickly (for replication asserts) 90 conf.setInt( 91 DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5); 92 conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); 93 94 // handle failures in the DFSClient pipeline quickly 95 // (for cluster.shutdown(); fs.close() idiom) 96 conf.setInt("ipc.client.connect.max.retries", 1); 97 } 98 99 /* 100 * Recover file. 101 * Try and open file in append mode. 102 * Doing this, we get a hold of the file that crashed writer 103 * was writing to. Once we have it, close it. This will 104 * allow subsequent reader to see up to last sync. 105 * NOTE: This is the same algorithm that HBase uses for file recovery 106 * @param fs 107 * @throws Exception 108 */ recoverFile(final FileSystem fs)109 private void recoverFile(final FileSystem fs) throws Exception { 110 LOG.info("Recovering File Lease"); 111 112 // set the soft limit to be 1 second so that the 113 // namenode triggers lease recovery upon append request 114 cluster.setLeasePeriod(1000, HdfsConstants.LEASE_HARDLIMIT_PERIOD); 115 116 // Trying recovery 117 int tries = 60; 118 boolean recovered = false; 119 FSDataOutputStream out = null; 120 while (!recovered && tries-- > 0) { 121 try { 122 out = fs.append(file1); 123 LOG.info("Successfully opened for append"); 124 recovered = true; 125 } catch (IOException e) { 126 LOG.info("Failed open for append, waiting on lease recovery"); 127 try { 128 Thread.sleep(1000); 129 } catch (InterruptedException ex) { 130 // ignore it and try again 131 } 132 } 133 } 134 if (out != null) { 135 out.close(); 136 } 137 if (!recovered) { 138 fail("Recovery should take < 1 min"); 139 } 140 LOG.info("Past out lease recovery"); 141 } 142 143 /** 144 * Test case that stops a writer after finalizing a block but 145 * before calling completeFile, and then tries to recover 146 * the lease from another thread. 147 */ 148 @Test(timeout=60000) testRecoverFinalizedBlock()149 public void testRecoverFinalizedBlock() throws Throwable { 150 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build(); 151 152 try { 153 cluster.waitActive(); 154 NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); 155 NamenodeProtocols spyNN = spy(preSpyNN); 156 157 // Delay completeFile 158 GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG); 159 doAnswer(delayer).when(spyNN).complete( 160 anyString(), anyString(), (ExtendedBlock)anyObject(), anyLong()); 161 162 DFSClient client = new DFSClient(null, spyNN, conf, null); 163 file1 = new Path("/testRecoverFinalized"); 164 final OutputStream stm = client.create("/testRecoverFinalized", true); 165 166 // write 1/2 block 167 AppendTestUtil.write(stm, 0, 4096); 168 final AtomicReference<Throwable> err = new AtomicReference<Throwable>(); 169 Thread t = new Thread() { 170 @Override 171 public void run() { 172 try { 173 stm.close(); 174 } catch (Throwable t) { 175 err.set(t); 176 } 177 }}; 178 t.start(); 179 LOG.info("Waiting for close to get to latch..."); 180 delayer.waitForCall(); 181 182 // At this point, the block is finalized on the DNs, but the file 183 // has not been completed in the NN. 184 // Lose the leases 185 LOG.info("Killing lease checker"); 186 client.getLeaseRenewer().interruptAndJoin(); 187 188 FileSystem fs1 = cluster.getFileSystem(); 189 FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername( 190 fs1.getConf()); 191 192 LOG.info("Recovering file"); 193 recoverFile(fs2); 194 195 LOG.info("Telling close to proceed."); 196 delayer.proceed(); 197 LOG.info("Waiting for close to finish."); 198 t.join(); 199 LOG.info("Close finished."); 200 201 // We expect that close will get a "File is not open" 202 // error. 203 Throwable thrownByClose = err.get(); 204 assertNotNull(thrownByClose); 205 assertTrue(thrownByClose instanceof IOException); 206 if (!thrownByClose.getMessage().contains( 207 "No lease on /testRecoverFinalized")) 208 throw thrownByClose; 209 } finally { 210 cluster.shutdown(); 211 } 212 } 213 214 /** 215 * Test case that stops a writer after finalizing a block but 216 * before calling completeFile, recovers a file from another writer, 217 * starts writing from that writer, and then has the old lease holder 218 * call completeFile 219 */ 220 @Test(timeout=60000) testCompleteOtherLeaseHoldersFile()221 public void testCompleteOtherLeaseHoldersFile() throws Throwable { 222 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build(); 223 224 try { 225 cluster.waitActive(); 226 NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); 227 NamenodeProtocols spyNN = spy(preSpyNN); 228 229 // Delay completeFile 230 GenericTestUtils.DelayAnswer delayer = 231 new GenericTestUtils.DelayAnswer(LOG); 232 doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), 233 (ExtendedBlock) anyObject(), anyLong()); 234 235 DFSClient client = new DFSClient(null, spyNN, conf, null); 236 file1 = new Path("/testCompleteOtherLease"); 237 final OutputStream stm = client.create("/testCompleteOtherLease", true); 238 239 // write 1/2 block 240 AppendTestUtil.write(stm, 0, 4096); 241 final AtomicReference<Throwable> err = new AtomicReference<Throwable>(); 242 Thread t = new Thread() { 243 @Override 244 public void run() { 245 try { 246 stm.close(); 247 } catch (Throwable t) { 248 err.set(t); 249 } 250 }}; 251 t.start(); 252 LOG.info("Waiting for close to get to latch..."); 253 delayer.waitForCall(); 254 255 // At this point, the block is finalized on the DNs, but the file 256 // has not been completed in the NN. 257 // Lose the leases 258 LOG.info("Killing lease checker"); 259 client.getLeaseRenewer().interruptAndJoin(); 260 261 FileSystem fs1 = cluster.getFileSystem(); 262 FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername( 263 fs1.getConf()); 264 265 LOG.info("Recovering file"); 266 recoverFile(fs2); 267 268 LOG.info("Opening file for append from new fs"); 269 FSDataOutputStream appenderStream = fs2.append(file1); 270 271 LOG.info("Writing some data from new appender"); 272 AppendTestUtil.write(appenderStream, 0, 4096); 273 274 LOG.info("Telling old close to proceed."); 275 delayer.proceed(); 276 LOG.info("Waiting for close to finish."); 277 t.join(); 278 LOG.info("Close finished."); 279 280 // We expect that close will get a "Lease mismatch" 281 // error. 282 Throwable thrownByClose = err.get(); 283 assertNotNull(thrownByClose); 284 assertTrue(thrownByClose instanceof IOException); 285 if (!thrownByClose.getMessage().contains( 286 "Lease mismatch")) 287 throw thrownByClose; 288 289 // The appender should be able to close properly 290 appenderStream.close(); 291 } finally { 292 cluster.shutdown(); 293 } 294 } 295 296 /** 297 * Test the updation of NeededReplications for the Appended Block 298 */ 299 @Test(timeout = 60000) testUpdateNeededReplicationsForAppendedFile()300 public void testUpdateNeededReplicationsForAppendedFile() throws Exception { 301 Configuration conf = new Configuration(); 302 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) 303 .build(); 304 DistributedFileSystem fileSystem = null; 305 try { 306 // create a file. 307 fileSystem = cluster.getFileSystem(); 308 Path f = new Path("/testAppend"); 309 FSDataOutputStream create = fileSystem.create(f, (short) 2); 310 create.write("/testAppend".getBytes()); 311 create.close(); 312 313 // Append to the file. 314 FSDataOutputStream append = fileSystem.append(f); 315 append.write("/testAppend".getBytes()); 316 append.close(); 317 318 // Start a new datanode 319 cluster.startDataNodes(conf, 1, true, null, null); 320 321 // Check for replications 322 DFSTestUtil.waitReplication(fileSystem, f, (short) 2); 323 } finally { 324 if (null != fileSystem) { 325 fileSystem.close(); 326 } 327 cluster.shutdown(); 328 } 329 } 330 331 /** 332 * Test that an append with no locations fails with an exception 333 * showing insufficient locations. 334 */ 335 @Test(timeout = 60000) testAppendInsufficientLocations()336 public void testAppendInsufficientLocations() throws Exception { 337 Configuration conf = new Configuration(); 338 339 // lower heartbeat interval for fast recognition of DN 340 conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 341 1000); 342 conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); 343 conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 3000); 344 345 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4) 346 .build(); 347 DistributedFileSystem fileSystem = null; 348 try { 349 // create a file with replication 3 350 fileSystem = cluster.getFileSystem(); 351 Path f = new Path("/testAppend"); 352 FSDataOutputStream create = fileSystem.create(f, (short) 2); 353 create.write("/testAppend".getBytes()); 354 create.close(); 355 356 // Check for replications 357 DFSTestUtil.waitReplication(fileSystem, f, (short) 2); 358 359 // Shut down all DNs that have the last block location for the file 360 LocatedBlocks lbs = fileSystem.dfs.getNamenode(). 361 getBlockLocations("/testAppend", 0, Long.MAX_VALUE); 362 List<DataNode> dnsOfCluster = cluster.getDataNodes(); 363 DatanodeInfo[] dnsWithLocations = lbs.getLastLocatedBlock(). 364 getLocations(); 365 for( DataNode dn : dnsOfCluster) { 366 for(DatanodeInfo loc: dnsWithLocations) { 367 if(dn.getDatanodeId().equals(loc)){ 368 dn.shutdown(); 369 DFSTestUtil.waitForDatanodeDeath(dn); 370 } 371 } 372 } 373 374 // Wait till 0 replication is recognized 375 DFSTestUtil.waitReplication(fileSystem, f, (short) 0); 376 377 // Append to the file, at this state there are 3 live DNs but none of them 378 // have the block. 379 try{ 380 fileSystem.append(f); 381 fail("Append should fail because insufficient locations"); 382 } catch (IOException e){ 383 LOG.info("Expected exception: ", e); 384 } 385 FSDirectory dir = cluster.getNamesystem().getFSDirectory(); 386 final INodeFile inode = INodeFile. 387 valueOf(dir.getINode("/testAppend"), "/testAppend"); 388 assertTrue("File should remain closed", !inode.isUnderConstruction()); 389 } finally { 390 if (null != fileSystem) { 391 fileSystem.close(); 392 } 393 cluster.shutdown(); 394 } 395 } 396 } 397