1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertFalse; 22 import static org.junit.Assert.assertTrue; 23 24 import java.io.IOException; 25 import java.io.InterruptedIOException; 26 import java.util.EnumSet; 27 28 import org.apache.commons.logging.impl.Log4JLogger; 29 import org.apache.hadoop.conf.Configuration; 30 import org.apache.hadoop.fs.FSDataInputStream; 31 import org.apache.hadoop.fs.FSDataOutputStream; 32 import org.apache.hadoop.fs.Path; 33 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; 34 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 35 import org.apache.hadoop.hdfs.server.datanode.DataNode; 36 import org.apache.hadoop.io.IOUtils; 37 import org.apache.log4j.Level; 38 import org.junit.Test; 39 40 /** Class contains a set of tests to verify the correctness of 41 * newly introduced {@link FSDataOutputStream#hflush()} method */ 42 public class TestHFlush { 43 { 44 ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); 45 ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); 46 } 47 48 private final String fName = "hflushtest.dat"; 49 50 /** 51 * The test uses 52 * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 53 * to write a file with a standard block size 54 */ 55 @Test hFlush_01()56 public void hFlush_01() throws IOException { 57 doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE, 58 (short) 2, false, EnumSet.noneOf(SyncFlag.class)); 59 } 60 61 /** 62 * The test uses 63 * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 64 * to write a file with a custom block size so the writes will be 65 * happening across block' boundaries 66 */ 67 @Test hFlush_02()68 public void hFlush_02() throws IOException { 69 Configuration conf = new HdfsConfiguration(); 70 int customPerChecksumSize = 512; 71 int customBlockSize = customPerChecksumSize * 3; 72 // Modify defaul filesystem settings 73 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); 74 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); 75 76 doTheJob(conf, fName, customBlockSize, (short) 2, false, 77 EnumSet.noneOf(SyncFlag.class)); 78 } 79 80 /** 81 * The test uses 82 * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 83 * to write a file with a custom block size so the writes will be 84 * happening across block's and checksum' boundaries 85 */ 86 @Test hFlush_03()87 public void hFlush_03() throws IOException { 88 Configuration conf = new HdfsConfiguration(); 89 int customPerChecksumSize = 400; 90 int customBlockSize = customPerChecksumSize * 3; 91 // Modify defaul filesystem settings 92 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); 93 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); 94 95 doTheJob(conf, fName, customBlockSize, (short) 2, false, 96 EnumSet.noneOf(SyncFlag.class)); 97 } 98 99 /** 100 * Test hsync (with updating block length in NameNode) while no data is 101 * actually written yet 102 */ 103 @Test hSyncUpdateLength_00()104 public void hSyncUpdateLength_00() throws IOException { 105 Configuration conf = new HdfsConfiguration(); 106 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( 107 2).build(); 108 DistributedFileSystem fileSystem = 109 cluster.getFileSystem(); 110 111 try { 112 Path path = new Path(fName); 113 FSDataOutputStream stm = fileSystem.create(path, true, 4096, (short) 2, 114 AppendTestUtil.BLOCK_SIZE); 115 System.out.println("Created file " + path.toString()); 116 ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet 117 .of(SyncFlag.UPDATE_LENGTH)); 118 long currentFileLength = fileSystem.getFileStatus(path).getLen(); 119 assertEquals(0L, currentFileLength); 120 stm.close(); 121 } finally { 122 fileSystem.close(); 123 cluster.shutdown(); 124 } 125 } 126 127 /** 128 * Test hsync with END_BLOCK flag. 129 */ 130 @Test hSyncEndBlock_00()131 public void hSyncEndBlock_00() throws IOException { 132 final int preferredBlockSize = 1024; 133 Configuration conf = new HdfsConfiguration(); 134 conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, preferredBlockSize); 135 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) 136 .build(); 137 DistributedFileSystem fileSystem = cluster.getFileSystem(); 138 FSDataOutputStream stm = null; 139 try { 140 Path path = new Path("/" + fName); 141 stm = fileSystem.create(path, true, 4096, (short) 2, 142 AppendTestUtil.BLOCK_SIZE); 143 System.out.println("Created file " + path.toString()); 144 ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet 145 .of(SyncFlag.END_BLOCK)); 146 long currentFileLength = fileSystem.getFileStatus(path).getLen(); 147 assertEquals(0L, currentFileLength); 148 LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0); 149 assertEquals(0, blocks.getLocatedBlocks().size()); 150 151 // write a block and call hsync(end_block) at the block boundary 152 stm.write(new byte[preferredBlockSize]); 153 ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet 154 .of(SyncFlag.END_BLOCK)); 155 currentFileLength = fileSystem.getFileStatus(path).getLen(); 156 assertEquals(preferredBlockSize, currentFileLength); 157 blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0); 158 assertEquals(1, blocks.getLocatedBlocks().size()); 159 160 // call hsync then call hsync(end_block) immediately 161 stm.write(new byte[preferredBlockSize / 2]); 162 stm.hsync(); 163 ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet 164 .of(SyncFlag.END_BLOCK)); 165 currentFileLength = fileSystem.getFileStatus(path).getLen(); 166 assertEquals(preferredBlockSize + preferredBlockSize / 2, 167 currentFileLength); 168 blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0); 169 assertEquals(2, blocks.getLocatedBlocks().size()); 170 171 stm.write(new byte[preferredBlockSize / 4]); 172 stm.hsync(); 173 currentFileLength = fileSystem.getFileStatus(path).getLen(); 174 assertEquals(preferredBlockSize + preferredBlockSize / 2 175 + preferredBlockSize / 4, currentFileLength); 176 blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0); 177 assertEquals(3, blocks.getLocatedBlocks().size()); 178 } finally { 179 IOUtils.cleanup(null, stm, fileSystem); 180 if (cluster != null) { 181 cluster.shutdown(); 182 } 183 } 184 } 185 186 /** 187 * The test calls 188 * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 189 * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}. 190 */ 191 @Test hSyncUpdateLength_01()192 public void hSyncUpdateLength_01() throws IOException { 193 doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE, 194 (short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH)); 195 } 196 197 /** 198 * The test calls 199 * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 200 * while requiring the semantic of {@link SyncFlag#END_BLOCK}. 201 */ 202 @Test hSyncEndBlock_01()203 public void hSyncEndBlock_01() throws IOException { 204 doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE, 205 (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK)); 206 } 207 208 /** 209 * The test calls 210 * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 211 * while requiring the semantic of {@link SyncFlag#END_BLOCK} and 212 * {@link SyncFlag#UPDATE_LENGTH}. 213 */ 214 @Test hSyncEndBlockAndUpdateLength()215 public void hSyncEndBlockAndUpdateLength() throws IOException { 216 doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE, 217 (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK, SyncFlag.UPDATE_LENGTH)); 218 } 219 220 /** 221 * The test calls 222 * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 223 * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}. 224 * Similar with {@link #hFlush_02()} , it writes a file with a custom block 225 * size so the writes will be happening across block' boundaries 226 */ 227 @Test hSyncUpdateLength_02()228 public void hSyncUpdateLength_02() throws IOException { 229 Configuration conf = new HdfsConfiguration(); 230 int customPerChecksumSize = 512; 231 int customBlockSize = customPerChecksumSize * 3; 232 // Modify defaul filesystem settings 233 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); 234 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); 235 236 doTheJob(conf, fName, customBlockSize, (short) 2, true, 237 EnumSet.of(SyncFlag.UPDATE_LENGTH)); 238 } 239 240 @Test hSyncEndBlock_02()241 public void hSyncEndBlock_02() throws IOException { 242 Configuration conf = new HdfsConfiguration(); 243 int customPerChecksumSize = 512; 244 int customBlockSize = customPerChecksumSize * 3; 245 // Modify defaul filesystem settings 246 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); 247 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); 248 249 doTheJob(conf, fName, customBlockSize, (short) 2, true, 250 EnumSet.of(SyncFlag.END_BLOCK)); 251 } 252 253 /** 254 * The test calls 255 * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)} 256 * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}. 257 * Similar with {@link #hFlush_03()} , it writes a file with a custom block 258 * size so the writes will be happening across block's and checksum' 259 * boundaries. 260 */ 261 @Test hSyncUpdateLength_03()262 public void hSyncUpdateLength_03() throws IOException { 263 Configuration conf = new HdfsConfiguration(); 264 int customPerChecksumSize = 400; 265 int customBlockSize = customPerChecksumSize * 3; 266 // Modify defaul filesystem settings 267 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); 268 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); 269 270 doTheJob(conf, fName, customBlockSize, (short) 2, true, 271 EnumSet.of(SyncFlag.UPDATE_LENGTH)); 272 } 273 274 @Test hSyncEndBlock_03()275 public void hSyncEndBlock_03() throws IOException { 276 Configuration conf = new HdfsConfiguration(); 277 int customPerChecksumSize = 400; 278 int customBlockSize = customPerChecksumSize * 3; 279 // Modify defaul filesystem settings 280 conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); 281 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); 282 283 doTheJob(conf, fName, customBlockSize, (short) 2, true, 284 EnumSet.of(SyncFlag.END_BLOCK)); 285 } 286 287 /** 288 * The method starts new cluster with defined Configuration; creates a file 289 * with specified block_size and writes 10 equal sections in it; it also calls 290 * hflush/hsync after each write and throws an IOException in case of an error. 291 * 292 * @param conf cluster configuration 293 * @param fileName of the file to be created and processed as required 294 * @param block_size value to be used for the file's creation 295 * @param replicas is the number of replicas 296 * @param isSync hsync or hflush 297 * @param syncFlags specify the semantic of the sync/flush 298 * @throws IOException in case of any errors 299 */ doTheJob(Configuration conf, final String fileName, long block_size, short replicas, boolean isSync, EnumSet<SyncFlag> syncFlags)300 public static void doTheJob(Configuration conf, final String fileName, 301 long block_size, short replicas, boolean isSync, 302 EnumSet<SyncFlag> syncFlags) throws IOException { 303 byte[] fileContent; 304 final int SECTIONS = 10; 305 306 fileContent = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); 307 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 308 .numDataNodes(replicas).build(); 309 // Make sure we work with DFS in order to utilize all its functionality 310 DistributedFileSystem fileSystem = cluster.getFileSystem(); 311 312 FSDataInputStream is; 313 try { 314 Path path = new Path(fileName); 315 final String pathName = new Path(fileSystem.getWorkingDirectory(), path) 316 .toUri().getPath(); 317 FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas, 318 block_size); 319 System.out.println("Created file " + fileName); 320 321 int tenth = AppendTestUtil.FILE_SIZE/SECTIONS; 322 int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS; 323 for (int i=0; i<SECTIONS; i++) { 324 System.out.println("Writing " + (tenth * i) + " to " 325 + (tenth * (i + 1)) + " section to file " + fileName); 326 // write to the file 327 stm.write(fileContent, tenth * i, tenth); 328 329 // Wait while hflush/hsync pushes all packets through built pipeline 330 if (isSync) { 331 ((DFSOutputStream)stm.getWrappedStream()).hsync(syncFlags); 332 } else { 333 ((DFSOutputStream)stm.getWrappedStream()).hflush(); 334 } 335 336 // Check file length if updatelength is required 337 if (isSync && syncFlags.contains(SyncFlag.UPDATE_LENGTH)) { 338 long currentFileLength = fileSystem.getFileStatus(path).getLen(); 339 assertEquals( 340 "File size doesn't match for hsync/hflush with updating the length", 341 tenth * (i + 1), currentFileLength); 342 } else if (isSync && syncFlags.contains(SyncFlag.END_BLOCK)) { 343 LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(pathName, 0); 344 assertEquals(i + 1, blocks.getLocatedBlocks().size()); 345 } 346 347 byte [] toRead = new byte[tenth]; 348 byte [] expected = new byte[tenth]; 349 System.arraycopy(fileContent, tenth * i, expected, 0, tenth); 350 // Open the same file for read. Need to create new reader after every write operation(!) 351 is = fileSystem.open(path); 352 is.seek(tenth * i); 353 int readBytes = is.read(toRead, 0, tenth); 354 System.out.println("Has read " + readBytes); 355 assertTrue("Should've get more bytes", (readBytes > 0) && (readBytes <= tenth)); 356 is.close(); 357 checkData(toRead, 0, readBytes, expected, "Partial verification"); 358 } 359 System.out.println("Writing " + (tenth * SECTIONS) + " to " + (tenth * SECTIONS + rounding) + " section to file " + fileName); 360 stm.write(fileContent, tenth * SECTIONS, rounding); 361 stm.close(); 362 363 assertEquals("File size doesn't match ", AppendTestUtil.FILE_SIZE, fileSystem.getFileStatus(path).getLen()); 364 AppendTestUtil.checkFullFile(fileSystem, path, fileContent.length, fileContent, "hflush()"); 365 } finally { 366 fileSystem.close(); 367 cluster.shutdown(); 368 } 369 } checkData(final byte[] actual, int from, int len, final byte[] expected, String message)370 static void checkData(final byte[] actual, int from, int len, 371 final byte[] expected, String message) { 372 for (int idx = 0; idx < len; idx++) { 373 assertEquals(message+" byte "+(from+idx)+" differs. expected "+ 374 expected[from+idx]+" actual "+actual[idx], 375 expected[from+idx], actual[idx]); 376 actual[idx] = 0; 377 } 378 } 379 380 /** This creates a slow writer and check to see 381 * if pipeline heartbeats work fine 382 */ 383 @Test testPipelineHeartbeat()384 public void testPipelineHeartbeat() throws Exception { 385 final int DATANODE_NUM = 2; 386 final int fileLen = 6; 387 Configuration conf = new HdfsConfiguration(); 388 final int timeout = 2000; 389 conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 390 timeout); 391 392 final Path p = new Path("/pipelineHeartbeat/foo"); 393 System.out.println("p=" + p); 394 395 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build(); 396 try { 397 DistributedFileSystem fs = cluster.getFileSystem(); 398 399 byte[] fileContents = AppendTestUtil.initBuffer(fileLen); 400 401 // create a new file. 402 FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM); 403 404 stm.write(fileContents, 0, 1); 405 Thread.sleep(timeout); 406 stm.hflush(); 407 System.out.println("Wrote 1 byte and hflush " + p); 408 409 // write another byte 410 Thread.sleep(timeout); 411 stm.write(fileContents, 1, 1); 412 stm.hflush(); 413 414 stm.write(fileContents, 2, 1); 415 Thread.sleep(timeout); 416 stm.hflush(); 417 418 stm.write(fileContents, 3, 1); 419 Thread.sleep(timeout); 420 stm.write(fileContents, 4, 1); 421 stm.hflush(); 422 423 stm.write(fileContents, 5, 1); 424 Thread.sleep(timeout); 425 stm.close(); 426 427 // verify that entire file is good 428 AppendTestUtil.checkFullFile(fs, p, fileLen, 429 fileContents, "Failed to slowly write to a file"); 430 } finally { 431 cluster.shutdown(); 432 } 433 } 434 435 @Test testHFlushInterrupted()436 public void testHFlushInterrupted() throws Exception { 437 final int DATANODE_NUM = 2; 438 final int fileLen = 6; 439 byte[] fileContents = AppendTestUtil.initBuffer(fileLen); 440 Configuration conf = new HdfsConfiguration(); 441 final Path p = new Path("/hflush-interrupted"); 442 443 System.out.println("p=" + p); 444 445 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build(); 446 try { 447 DistributedFileSystem fs = cluster.getFileSystem(); 448 449 // create a new file. 450 FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM); 451 452 stm.write(fileContents, 0, 2); 453 Thread.currentThread().interrupt(); 454 try { 455 stm.hflush(); 456 // If we made it past the hflush(), then that means that the ack made it back 457 // from the pipeline before we got to the wait() call. In that case we should 458 // still have interrupted status. 459 assertTrue(Thread.interrupted()); 460 } catch (InterruptedIOException ie) { 461 System.out.println("Got expected exception during flush"); 462 } 463 assertFalse(Thread.interrupted()); 464 465 // Try again to flush should succeed since we no longer have interrupt status 466 stm.hflush(); 467 468 // Write some more data and flush 469 stm.write(fileContents, 2, 2); 470 stm.hflush(); 471 472 // Write some data and close while interrupted 473 474 stm.write(fileContents, 4, 2); 475 Thread.currentThread().interrupt(); 476 try { 477 stm.close(); 478 // If we made it past the close(), then that means that the ack made it back 479 // from the pipeline before we got to the wait() call. In that case we should 480 // still have interrupted status. 481 assertTrue(Thread.interrupted()); 482 } catch (InterruptedIOException ioe) { 483 System.out.println("Got expected exception during close"); 484 // If we got the exception, we shouldn't have interrupted status anymore. 485 assertFalse(Thread.interrupted()); 486 487 // Now do a successful close. 488 stm.close(); 489 } 490 491 492 // verify that entire file is good 493 AppendTestUtil.checkFullFile(fs, p, 4, fileContents, 494 "Failed to deal with thread interruptions", false); 495 } finally { 496 cluster.shutdown(); 497 } 498 } 499 } 500