1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.util.EnumSet;
27 
28 import org.apache.commons.logging.impl.Log4JLogger;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FSDataInputStream;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
34 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
35 import org.apache.hadoop.hdfs.server.datanode.DataNode;
36 import org.apache.hadoop.io.IOUtils;
37 import org.apache.log4j.Level;
38 import org.junit.Test;
39 
40 /** Class contains a set of tests to verify the correctness of
41  * newly introduced {@link FSDataOutputStream#hflush()} method */
42 public class TestHFlush {
43   {
44     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
45     ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
46   }
47 
48   private final String fName = "hflushtest.dat";
49 
50   /**
51    * The test uses
52    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
53    * to write a file with a standard block size
54    */
55   @Test
hFlush_01()56   public void hFlush_01() throws IOException {
57     doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
58         (short) 2, false, EnumSet.noneOf(SyncFlag.class));
59   }
60 
61   /**
62    * The test uses
63    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
64    * to write a file with a custom block size so the writes will be
65    * happening across block' boundaries
66    */
67   @Test
hFlush_02()68   public void hFlush_02() throws IOException {
69     Configuration conf = new HdfsConfiguration();
70     int customPerChecksumSize = 512;
71     int customBlockSize = customPerChecksumSize * 3;
72     // Modify defaul filesystem settings
73     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
74     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
75 
76     doTheJob(conf, fName, customBlockSize, (short) 2, false,
77         EnumSet.noneOf(SyncFlag.class));
78   }
79 
80   /**
81    * The test uses
82    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
83    * to write a file with a custom block size so the writes will be
84    * happening across block's and checksum' boundaries
85    */
86   @Test
hFlush_03()87   public void hFlush_03() throws IOException {
88     Configuration conf = new HdfsConfiguration();
89     int customPerChecksumSize = 400;
90     int customBlockSize = customPerChecksumSize * 3;
91     // Modify defaul filesystem settings
92     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
93     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
94 
95     doTheJob(conf, fName, customBlockSize, (short) 2, false,
96         EnumSet.noneOf(SyncFlag.class));
97   }
98 
99   /**
100    * Test hsync (with updating block length in NameNode) while no data is
101    * actually written yet
102    */
103   @Test
hSyncUpdateLength_00()104   public void hSyncUpdateLength_00() throws IOException {
105     Configuration conf = new HdfsConfiguration();
106     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
107         2).build();
108     DistributedFileSystem fileSystem =
109         cluster.getFileSystem();
110 
111     try {
112       Path path = new Path(fName);
113       FSDataOutputStream stm = fileSystem.create(path, true, 4096, (short) 2,
114           AppendTestUtil.BLOCK_SIZE);
115       System.out.println("Created file " + path.toString());
116       ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
117           .of(SyncFlag.UPDATE_LENGTH));
118       long currentFileLength = fileSystem.getFileStatus(path).getLen();
119       assertEquals(0L, currentFileLength);
120       stm.close();
121     } finally {
122       fileSystem.close();
123       cluster.shutdown();
124     }
125   }
126 
127   /**
128    * Test hsync with END_BLOCK flag.
129    */
130   @Test
hSyncEndBlock_00()131   public void hSyncEndBlock_00() throws IOException {
132     final int preferredBlockSize = 1024;
133     Configuration conf = new HdfsConfiguration();
134     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, preferredBlockSize);
135     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
136         .build();
137     DistributedFileSystem fileSystem = cluster.getFileSystem();
138     FSDataOutputStream stm = null;
139     try {
140       Path path = new Path("/" + fName);
141       stm = fileSystem.create(path, true, 4096, (short) 2,
142           AppendTestUtil.BLOCK_SIZE);
143       System.out.println("Created file " + path.toString());
144       ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
145           .of(SyncFlag.END_BLOCK));
146       long currentFileLength = fileSystem.getFileStatus(path).getLen();
147       assertEquals(0L, currentFileLength);
148       LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
149       assertEquals(0, blocks.getLocatedBlocks().size());
150 
151       // write a block and call hsync(end_block) at the block boundary
152       stm.write(new byte[preferredBlockSize]);
153       ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
154           .of(SyncFlag.END_BLOCK));
155       currentFileLength = fileSystem.getFileStatus(path).getLen();
156       assertEquals(preferredBlockSize, currentFileLength);
157       blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
158       assertEquals(1, blocks.getLocatedBlocks().size());
159 
160       // call hsync then call hsync(end_block) immediately
161       stm.write(new byte[preferredBlockSize / 2]);
162       stm.hsync();
163       ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
164           .of(SyncFlag.END_BLOCK));
165       currentFileLength = fileSystem.getFileStatus(path).getLen();
166       assertEquals(preferredBlockSize + preferredBlockSize / 2,
167           currentFileLength);
168       blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
169       assertEquals(2, blocks.getLocatedBlocks().size());
170 
171       stm.write(new byte[preferredBlockSize / 4]);
172       stm.hsync();
173       currentFileLength = fileSystem.getFileStatus(path).getLen();
174       assertEquals(preferredBlockSize + preferredBlockSize / 2
175           + preferredBlockSize / 4, currentFileLength);
176       blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
177       assertEquals(3, blocks.getLocatedBlocks().size());
178     } finally {
179       IOUtils.cleanup(null, stm, fileSystem);
180       if (cluster != null) {
181         cluster.shutdown();
182       }
183     }
184   }
185 
186   /**
187    * The test calls
188    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
189    * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
190    */
191   @Test
hSyncUpdateLength_01()192   public void hSyncUpdateLength_01() throws IOException {
193     doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
194         (short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH));
195   }
196 
197   /**
198    * The test calls
199    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
200    * while requiring the semantic of {@link SyncFlag#END_BLOCK}.
201    */
202   @Test
hSyncEndBlock_01()203   public void hSyncEndBlock_01() throws IOException {
204     doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
205         (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK));
206   }
207 
208   /**
209    * The test calls
210    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
211    * while requiring the semantic of {@link SyncFlag#END_BLOCK} and
212    * {@link SyncFlag#UPDATE_LENGTH}.
213    */
214   @Test
hSyncEndBlockAndUpdateLength()215   public void hSyncEndBlockAndUpdateLength() throws IOException {
216     doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
217         (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK, SyncFlag.UPDATE_LENGTH));
218   }
219 
220   /**
221    * The test calls
222    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
223    * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
224    * Similar with {@link #hFlush_02()} , it writes a file with a custom block
225    * size so the writes will be happening across block' boundaries
226    */
227   @Test
hSyncUpdateLength_02()228   public void hSyncUpdateLength_02() throws IOException {
229     Configuration conf = new HdfsConfiguration();
230     int customPerChecksumSize = 512;
231     int customBlockSize = customPerChecksumSize * 3;
232     // Modify defaul filesystem settings
233     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
234     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
235 
236     doTheJob(conf, fName, customBlockSize, (short) 2, true,
237         EnumSet.of(SyncFlag.UPDATE_LENGTH));
238   }
239 
240   @Test
hSyncEndBlock_02()241   public void hSyncEndBlock_02() throws IOException {
242     Configuration conf = new HdfsConfiguration();
243     int customPerChecksumSize = 512;
244     int customBlockSize = customPerChecksumSize * 3;
245     // Modify defaul filesystem settings
246     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
247     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
248 
249     doTheJob(conf, fName, customBlockSize, (short) 2, true,
250         EnumSet.of(SyncFlag.END_BLOCK));
251   }
252 
253   /**
254    * The test calls
255    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
256    * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
257    * Similar with {@link #hFlush_03()} , it writes a file with a custom block
258    * size so the writes will be happening across block's and checksum'
259    * boundaries.
260    */
261   @Test
hSyncUpdateLength_03()262   public void hSyncUpdateLength_03() throws IOException {
263     Configuration conf = new HdfsConfiguration();
264     int customPerChecksumSize = 400;
265     int customBlockSize = customPerChecksumSize * 3;
266     // Modify defaul filesystem settings
267     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
268     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
269 
270     doTheJob(conf, fName, customBlockSize, (short) 2, true,
271         EnumSet.of(SyncFlag.UPDATE_LENGTH));
272   }
273 
274   @Test
hSyncEndBlock_03()275   public void hSyncEndBlock_03() throws IOException {
276     Configuration conf = new HdfsConfiguration();
277     int customPerChecksumSize = 400;
278     int customBlockSize = customPerChecksumSize * 3;
279     // Modify defaul filesystem settings
280     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
281     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
282 
283     doTheJob(conf, fName, customBlockSize, (short) 2, true,
284         EnumSet.of(SyncFlag.END_BLOCK));
285   }
286 
287   /**
288    * The method starts new cluster with defined Configuration; creates a file
289    * with specified block_size and writes 10 equal sections in it; it also calls
290    * hflush/hsync after each write and throws an IOException in case of an error.
291    *
292    * @param conf cluster configuration
293    * @param fileName of the file to be created and processed as required
294    * @param block_size value to be used for the file's creation
295    * @param replicas is the number of replicas
296    * @param isSync hsync or hflush
297    * @param syncFlags specify the semantic of the sync/flush
298    * @throws IOException in case of any errors
299    */
doTheJob(Configuration conf, final String fileName, long block_size, short replicas, boolean isSync, EnumSet<SyncFlag> syncFlags)300   public static void doTheJob(Configuration conf, final String fileName,
301       long block_size, short replicas, boolean isSync,
302       EnumSet<SyncFlag> syncFlags) throws IOException {
303     byte[] fileContent;
304     final int SECTIONS = 10;
305 
306     fileContent = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
307     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
308                                                .numDataNodes(replicas).build();
309     // Make sure we work with DFS in order to utilize all its functionality
310     DistributedFileSystem fileSystem = cluster.getFileSystem();
311 
312     FSDataInputStream is;
313     try {
314       Path path = new Path(fileName);
315       final String pathName = new Path(fileSystem.getWorkingDirectory(), path)
316           .toUri().getPath();
317       FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas,
318           block_size);
319       System.out.println("Created file " + fileName);
320 
321       int tenth = AppendTestUtil.FILE_SIZE/SECTIONS;
322       int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS;
323       for (int i=0; i<SECTIONS; i++) {
324         System.out.println("Writing " + (tenth * i) + " to "
325             + (tenth * (i + 1)) + " section to file " + fileName);
326         // write to the file
327         stm.write(fileContent, tenth * i, tenth);
328 
329         // Wait while hflush/hsync pushes all packets through built pipeline
330         if (isSync) {
331           ((DFSOutputStream)stm.getWrappedStream()).hsync(syncFlags);
332         } else {
333           ((DFSOutputStream)stm.getWrappedStream()).hflush();
334         }
335 
336         // Check file length if updatelength is required
337         if (isSync && syncFlags.contains(SyncFlag.UPDATE_LENGTH)) {
338           long currentFileLength = fileSystem.getFileStatus(path).getLen();
339           assertEquals(
340             "File size doesn't match for hsync/hflush with updating the length",
341             tenth * (i + 1), currentFileLength);
342         } else if (isSync && syncFlags.contains(SyncFlag.END_BLOCK)) {
343           LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(pathName, 0);
344           assertEquals(i + 1, blocks.getLocatedBlocks().size());
345         }
346 
347         byte [] toRead = new byte[tenth];
348         byte [] expected = new byte[tenth];
349         System.arraycopy(fileContent, tenth * i, expected, 0, tenth);
350         // Open the same file for read. Need to create new reader after every write operation(!)
351         is = fileSystem.open(path);
352         is.seek(tenth * i);
353         int readBytes = is.read(toRead, 0, tenth);
354         System.out.println("Has read " + readBytes);
355         assertTrue("Should've get more bytes", (readBytes > 0) && (readBytes <= tenth));
356         is.close();
357         checkData(toRead, 0, readBytes, expected, "Partial verification");
358       }
359       System.out.println("Writing " + (tenth * SECTIONS) + " to " + (tenth * SECTIONS + rounding) + " section to file " + fileName);
360       stm.write(fileContent, tenth * SECTIONS, rounding);
361       stm.close();
362 
363       assertEquals("File size doesn't match ", AppendTestUtil.FILE_SIZE, fileSystem.getFileStatus(path).getLen());
364       AppendTestUtil.checkFullFile(fileSystem, path, fileContent.length, fileContent, "hflush()");
365     } finally {
366       fileSystem.close();
367       cluster.shutdown();
368     }
369   }
checkData(final byte[] actual, int from, int len, final byte[] expected, String message)370   static void checkData(final byte[] actual, int from, int len,
371                         final byte[] expected, String message) {
372     for (int idx = 0; idx < len; idx++) {
373       assertEquals(message+" byte "+(from+idx)+" differs. expected "+
374                    expected[from+idx]+" actual "+actual[idx],
375                    expected[from+idx], actual[idx]);
376       actual[idx] = 0;
377     }
378   }
379 
380   /** This creates a slow writer and check to see
381    * if pipeline heartbeats work fine
382    */
383  @Test
testPipelineHeartbeat()384   public void testPipelineHeartbeat() throws Exception {
385     final int DATANODE_NUM = 2;
386     final int fileLen = 6;
387     Configuration conf = new HdfsConfiguration();
388     final int timeout = 2000;
389     conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
390         timeout);
391 
392     final Path p = new Path("/pipelineHeartbeat/foo");
393     System.out.println("p=" + p);
394 
395     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
396     try {
397       DistributedFileSystem fs = cluster.getFileSystem();
398 
399       byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
400 
401       // create a new file.
402       FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
403 
404       stm.write(fileContents, 0, 1);
405       Thread.sleep(timeout);
406       stm.hflush();
407       System.out.println("Wrote 1 byte and hflush " + p);
408 
409       // write another byte
410       Thread.sleep(timeout);
411       stm.write(fileContents, 1, 1);
412       stm.hflush();
413 
414       stm.write(fileContents, 2, 1);
415       Thread.sleep(timeout);
416       stm.hflush();
417 
418       stm.write(fileContents, 3, 1);
419       Thread.sleep(timeout);
420       stm.write(fileContents, 4, 1);
421       stm.hflush();
422 
423       stm.write(fileContents, 5, 1);
424       Thread.sleep(timeout);
425       stm.close();
426 
427       // verify that entire file is good
428       AppendTestUtil.checkFullFile(fs, p, fileLen,
429           fileContents, "Failed to slowly write to a file");
430     } finally {
431       cluster.shutdown();
432     }
433   }
434 
435   @Test
testHFlushInterrupted()436   public void testHFlushInterrupted() throws Exception {
437     final int DATANODE_NUM = 2;
438     final int fileLen = 6;
439     byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
440     Configuration conf = new HdfsConfiguration();
441     final Path p = new Path("/hflush-interrupted");
442 
443     System.out.println("p=" + p);
444 
445     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
446     try {
447       DistributedFileSystem fs = cluster.getFileSystem();
448 
449       // create a new file.
450       FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
451 
452       stm.write(fileContents, 0, 2);
453       Thread.currentThread().interrupt();
454       try {
455         stm.hflush();
456         // If we made it past the hflush(), then that means that the ack made it back
457         // from the pipeline before we got to the wait() call. In that case we should
458         // still have interrupted status.
459         assertTrue(Thread.interrupted());
460       } catch (InterruptedIOException ie) {
461         System.out.println("Got expected exception during flush");
462       }
463       assertFalse(Thread.interrupted());
464 
465       // Try again to flush should succeed since we no longer have interrupt status
466       stm.hflush();
467 
468       // Write some more data and flush
469       stm.write(fileContents, 2, 2);
470       stm.hflush();
471 
472       // Write some data and close while interrupted
473 
474       stm.write(fileContents, 4, 2);
475       Thread.currentThread().interrupt();
476       try {
477         stm.close();
478         // If we made it past the close(), then that means that the ack made it back
479         // from the pipeline before we got to the wait() call. In that case we should
480         // still have interrupted status.
481         assertTrue(Thread.interrupted());
482       } catch (InterruptedIOException ioe) {
483         System.out.println("Got expected exception during close");
484         // If we got the exception, we shouldn't have interrupted status anymore.
485         assertFalse(Thread.interrupted());
486 
487         // Now do a successful close.
488         stm.close();
489       }
490 
491 
492       // verify that entire file is good
493       AppendTestUtil.checkFullFile(fs, p, 4, fileContents,
494           "Failed to deal with thread interruptions", false);
495     } finally {
496       cluster.shutdown();
497     }
498   }
499 }
500