1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.junit.Assert.assertNotNull;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23 import static org.mockito.Matchers.anyLong;
24 import static org.mockito.Matchers.anyObject;
25 import static org.mockito.Matchers.anyString;
26 import static org.mockito.Mockito.doAnswer;
27 import static org.mockito.Mockito.spy;
28 
29 import java.io.IOException;
30 import java.io.OutputStream;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicReference;
33 
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.commons.logging.impl.Log4JLogger;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FSDataOutputStream;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
42 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
43 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
44 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
45 import org.apache.hadoop.hdfs.server.datanode.DataNode;
46 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
47 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
48 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
49 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
50 import org.apache.hadoop.test.GenericTestUtils;
51 import org.apache.log4j.Level;
52 import org.junit.Before;
53 import org.junit.Test;
54 
55 /* File Append tests for HDFS-200 & HDFS-142, specifically focused on:
56  *  using append()/sync() to recover block information
57  */
58 public class TestFileAppend4 {
59   static final Log LOG = LogFactory.getLog(TestFileAppend4.class);
60   static final long BLOCK_SIZE = 1024;
61   static final long BBW_SIZE = 500; // don't align on bytes/checksum
62 
63   static final Object [] NO_ARGS = new Object []{};
64 
65   Configuration conf;
66   MiniDFSCluster cluster;
67   Path file1;
68   FSDataOutputStream stm;
69   final boolean simulatedStorage = false;
70 
71   {
72     DFSTestUtil.setNameNodeLogLevel(Level.ALL);
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL)73     GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL)74     GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
75   }
76 
77   @Before
setUp()78   public void setUp() throws Exception {
79     this.conf = new Configuration();
80     if (simulatedStorage) {
81       SimulatedFSDataset.setFactory(conf);
82     }
83 
84     // lower heartbeat interval for fast recognition of DN death
85     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
86         1000);
87     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
88     conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
89     // handle under-replicated blocks quickly (for replication asserts)
90     conf.setInt(
91         DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5);
92     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
93 
94     // handle failures in the DFSClient pipeline quickly
95     // (for cluster.shutdown(); fs.close() idiom)
96     conf.setInt("ipc.client.connect.max.retries", 1);
97   }
98 
99   /*
100    * Recover file.
101    * Try and open file in append mode.
102    * Doing this, we get a hold of the file that crashed writer
103    * was writing to.  Once we have it, close it.  This will
104    * allow subsequent reader to see up to last sync.
105    * NOTE: This is the same algorithm that HBase uses for file recovery
106    * @param fs
107    * @throws Exception
108    */
recoverFile(final FileSystem fs)109   private void recoverFile(final FileSystem fs) throws Exception {
110     LOG.info("Recovering File Lease");
111 
112     // set the soft limit to be 1 second so that the
113     // namenode triggers lease recovery upon append request
114     cluster.setLeasePeriod(1000, HdfsConstants.LEASE_HARDLIMIT_PERIOD);
115 
116     // Trying recovery
117     int tries = 60;
118     boolean recovered = false;
119     FSDataOutputStream out = null;
120     while (!recovered && tries-- > 0) {
121       try {
122         out = fs.append(file1);
123         LOG.info("Successfully opened for append");
124         recovered = true;
125       } catch (IOException e) {
126         LOG.info("Failed open for append, waiting on lease recovery");
127         try {
128           Thread.sleep(1000);
129         } catch (InterruptedException ex) {
130           // ignore it and try again
131         }
132       }
133     }
134     if (out != null) {
135       out.close();
136     }
137     if (!recovered) {
138       fail("Recovery should take < 1 min");
139     }
140     LOG.info("Past out lease recovery");
141   }
142 
143   /**
144    * Test case that stops a writer after finalizing a block but
145    * before calling completeFile, and then tries to recover
146    * the lease from another thread.
147    */
148   @Test(timeout=60000)
testRecoverFinalizedBlock()149   public void testRecoverFinalizedBlock() throws Throwable {
150     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
151 
152     try {
153       cluster.waitActive();
154       NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
155       NamenodeProtocols spyNN = spy(preSpyNN);
156 
157       // Delay completeFile
158       GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
159       doAnswer(delayer).when(spyNN).complete(
160           anyString(), anyString(), (ExtendedBlock)anyObject(), anyLong());
161 
162       DFSClient client = new DFSClient(null, spyNN, conf, null);
163       file1 = new Path("/testRecoverFinalized");
164       final OutputStream stm = client.create("/testRecoverFinalized", true);
165 
166       // write 1/2 block
167       AppendTestUtil.write(stm, 0, 4096);
168       final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
169       Thread t = new Thread() {
170           @Override
171           public void run() {
172             try {
173               stm.close();
174             } catch (Throwable t) {
175               err.set(t);
176             }
177           }};
178       t.start();
179       LOG.info("Waiting for close to get to latch...");
180       delayer.waitForCall();
181 
182       // At this point, the block is finalized on the DNs, but the file
183       // has not been completed in the NN.
184       // Lose the leases
185       LOG.info("Killing lease checker");
186       client.getLeaseRenewer().interruptAndJoin();
187 
188       FileSystem fs1 = cluster.getFileSystem();
189       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
190         fs1.getConf());
191 
192       LOG.info("Recovering file");
193       recoverFile(fs2);
194 
195       LOG.info("Telling close to proceed.");
196       delayer.proceed();
197       LOG.info("Waiting for close to finish.");
198       t.join();
199       LOG.info("Close finished.");
200 
201       // We expect that close will get a "File is not open"
202       // error.
203       Throwable thrownByClose = err.get();
204       assertNotNull(thrownByClose);
205       assertTrue(thrownByClose instanceof IOException);
206       if (!thrownByClose.getMessage().contains(
207             "No lease on /testRecoverFinalized"))
208         throw thrownByClose;
209     } finally {
210       cluster.shutdown();
211     }
212   }
213 
214   /**
215    * Test case that stops a writer after finalizing a block but
216    * before calling completeFile, recovers a file from another writer,
217    * starts writing from that writer, and then has the old lease holder
218    * call completeFile
219    */
220   @Test(timeout=60000)
testCompleteOtherLeaseHoldersFile()221   public void testCompleteOtherLeaseHoldersFile() throws Throwable {
222     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
223 
224     try {
225       cluster.waitActive();
226       NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
227       NamenodeProtocols spyNN = spy(preSpyNN);
228 
229       // Delay completeFile
230       GenericTestUtils.DelayAnswer delayer =
231         new GenericTestUtils.DelayAnswer(LOG);
232       doAnswer(delayer).when(spyNN).complete(anyString(), anyString(),
233           (ExtendedBlock) anyObject(), anyLong());
234 
235       DFSClient client = new DFSClient(null, spyNN, conf, null);
236       file1 = new Path("/testCompleteOtherLease");
237       final OutputStream stm = client.create("/testCompleteOtherLease", true);
238 
239       // write 1/2 block
240       AppendTestUtil.write(stm, 0, 4096);
241       final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
242       Thread t = new Thread() {
243           @Override
244           public void run() {
245             try {
246               stm.close();
247             } catch (Throwable t) {
248               err.set(t);
249             }
250           }};
251       t.start();
252       LOG.info("Waiting for close to get to latch...");
253       delayer.waitForCall();
254 
255       // At this point, the block is finalized on the DNs, but the file
256       // has not been completed in the NN.
257       // Lose the leases
258       LOG.info("Killing lease checker");
259       client.getLeaseRenewer().interruptAndJoin();
260 
261       FileSystem fs1 = cluster.getFileSystem();
262       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
263         fs1.getConf());
264 
265       LOG.info("Recovering file");
266       recoverFile(fs2);
267 
268       LOG.info("Opening file for append from new fs");
269       FSDataOutputStream appenderStream = fs2.append(file1);
270 
271       LOG.info("Writing some data from new appender");
272       AppendTestUtil.write(appenderStream, 0, 4096);
273 
274       LOG.info("Telling old close to proceed.");
275       delayer.proceed();
276       LOG.info("Waiting for close to finish.");
277       t.join();
278       LOG.info("Close finished.");
279 
280       // We expect that close will get a "Lease mismatch"
281       // error.
282       Throwable thrownByClose = err.get();
283       assertNotNull(thrownByClose);
284       assertTrue(thrownByClose instanceof IOException);
285       if (!thrownByClose.getMessage().contains(
286             "Lease mismatch"))
287         throw thrownByClose;
288 
289       // The appender should be able to close properly
290       appenderStream.close();
291     } finally {
292       cluster.shutdown();
293     }
294   }
295 
296   /**
297    * Test the updation of NeededReplications for the Appended Block
298    */
299   @Test(timeout = 60000)
testUpdateNeededReplicationsForAppendedFile()300   public void testUpdateNeededReplicationsForAppendedFile() throws Exception {
301     Configuration conf = new Configuration();
302     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
303         .build();
304     DistributedFileSystem fileSystem = null;
305     try {
306       // create a file.
307       fileSystem = cluster.getFileSystem();
308       Path f = new Path("/testAppend");
309       FSDataOutputStream create = fileSystem.create(f, (short) 2);
310       create.write("/testAppend".getBytes());
311       create.close();
312 
313       // Append to the file.
314       FSDataOutputStream append = fileSystem.append(f);
315       append.write("/testAppend".getBytes());
316       append.close();
317 
318       // Start a new datanode
319       cluster.startDataNodes(conf, 1, true, null, null);
320 
321       // Check for replications
322       DFSTestUtil.waitReplication(fileSystem, f, (short) 2);
323     } finally {
324       if (null != fileSystem) {
325         fileSystem.close();
326       }
327       cluster.shutdown();
328     }
329   }
330 
331   /**
332    * Test that an append with no locations fails with an exception
333    * showing insufficient locations.
334    */
335   @Test(timeout = 60000)
testAppendInsufficientLocations()336   public void testAppendInsufficientLocations() throws Exception {
337     Configuration conf = new Configuration();
338 
339     // lower heartbeat interval for fast recognition of DN
340     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
341         1000);
342     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
343     conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 3000);
344 
345     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4)
346         .build();
347     DistributedFileSystem fileSystem = null;
348     try {
349       // create a file with replication 3
350       fileSystem = cluster.getFileSystem();
351       Path f = new Path("/testAppend");
352       FSDataOutputStream create = fileSystem.create(f, (short) 2);
353       create.write("/testAppend".getBytes());
354       create.close();
355 
356       // Check for replications
357       DFSTestUtil.waitReplication(fileSystem, f, (short) 2);
358 
359       // Shut down all DNs that have the last block location for the file
360       LocatedBlocks lbs = fileSystem.dfs.getNamenode().
361           getBlockLocations("/testAppend", 0, Long.MAX_VALUE);
362       List<DataNode> dnsOfCluster = cluster.getDataNodes();
363       DatanodeInfo[] dnsWithLocations = lbs.getLastLocatedBlock().
364           getLocations();
365       for( DataNode dn : dnsOfCluster) {
366         for(DatanodeInfo loc: dnsWithLocations) {
367           if(dn.getDatanodeId().equals(loc)){
368             dn.shutdown();
369             DFSTestUtil.waitForDatanodeDeath(dn);
370           }
371         }
372       }
373 
374       // Wait till 0 replication is recognized
375       DFSTestUtil.waitReplication(fileSystem, f, (short) 0);
376 
377       // Append to the file, at this state there are 3 live DNs but none of them
378       // have the block.
379       try{
380         fileSystem.append(f);
381         fail("Append should fail because insufficient locations");
382       } catch (IOException e){
383         LOG.info("Expected exception: ", e);
384       }
385       FSDirectory dir = cluster.getNamesystem().getFSDirectory();
386       final INodeFile inode = INodeFile.
387           valueOf(dir.getINode("/testAppend"), "/testAppend");
388       assertTrue("File should remain closed", !inode.isUnderConstruction());
389     } finally {
390       if (null != fileSystem) {
391         fileSystem.close();
392       }
393       cluster.shutdown();
394     }
395   }
396 }
397