1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.fs; 19 20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; 21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE; 22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED; 23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; 24 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; 25 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; 26 27 import java.io.File; 28 import java.io.FileInputStream; 29 import java.io.FileOutputStream; 30 import java.io.IOException; 31 import java.io.InputStream; 32 import java.nio.ByteBuffer; 33 import java.util.Arrays; 34 import java.util.EnumSet; 35 import java.util.Map; 36 import java.util.Random; 37 import java.util.concurrent.TimeoutException; 38 39 import org.apache.commons.lang.SystemUtils; 40 import org.apache.commons.lang.mutable.MutableBoolean; 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 import org.apache.hadoop.hdfs.BlockReaderTestUtil; 44 import org.apache.hadoop.hdfs.ClientContext; 45 import org.apache.hadoop.hdfs.DFSClient; 46 import org.apache.hadoop.hdfs.DFSConfigKeys; 47 import org.apache.hadoop.hdfs.DFSTestUtil; 48 import org.apache.hadoop.hdfs.DistributedFileSystem; 49 import org.apache.hadoop.hdfs.ExtendedBlockId; 50 import org.apache.hadoop.hdfs.HdfsConfiguration; 51 import org.apache.hadoop.hdfs.MiniDFSCluster; 52 import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 53 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; 54 import org.apache.hadoop.hdfs.protocol.CachePoolInfo; 55 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 56 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 57 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; 58 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor; 59 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; 60 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; 61 import org.apache.hadoop.io.ByteBufferPool; 62 import org.apache.hadoop.io.IOUtils; 63 import org.apache.hadoop.io.nativeio.NativeIO; 64 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; 65 import org.apache.hadoop.net.unix.DomainSocket; 66 import org.apache.hadoop.net.unix.TemporarySocketDirectory; 67 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 68 import org.apache.hadoop.test.GenericTestUtils; 69 import org.junit.AfterClass; 70 import org.junit.Assert; 71 import org.junit.Assume; 72 import org.junit.BeforeClass; 73 import org.junit.Test; 74 75 import com.google.common.base.Preconditions; 76 import com.google.common.base.Supplier; 77 78 /** 79 * This class tests if EnhancedByteBufferAccess works correctly. 80 */ 81 public class TestEnhancedByteBufferAccess { 82 private static final Log LOG = 83 LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName()); 84 85 static private TemporarySocketDirectory sockDir; 86 87 static private CacheManipulator prevCacheManipulator; 88 89 @BeforeClass init()90 public static void init() { 91 sockDir = new TemporarySocketDirectory(); 92 DomainSocket.disableBindPathValidation(); 93 prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); 94 NativeIO.POSIX.setCacheManipulator(new CacheManipulator() { 95 @Override 96 public void mlock(String identifier, 97 ByteBuffer mmap, long length) throws IOException { 98 LOG.info("mlocking " + identifier); 99 } 100 }); 101 } 102 103 @AfterClass teardown()104 public static void teardown() { 105 // Restore the original CacheManipulator 106 NativeIO.POSIX.setCacheManipulator(prevCacheManipulator); 107 } 108 byteBufferToArray(ByteBuffer buf)109 private static byte[] byteBufferToArray(ByteBuffer buf) { 110 byte resultArray[] = new byte[buf.remaining()]; 111 buf.get(resultArray); 112 buf.flip(); 113 return resultArray; 114 } 115 116 private static final int BLOCK_SIZE = 117 (int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize(); 118 initZeroCopyTest()119 public static HdfsConfiguration initZeroCopyTest() { 120 Assume.assumeTrue(NativeIO.isAvailable()); 121 Assume.assumeTrue(SystemUtils.IS_OS_UNIX); 122 HdfsConfiguration conf = new HdfsConfiguration(); 123 conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); 124 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); 125 conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3); 126 conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100); 127 conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, 128 new File(sockDir.getDir(), 129 "TestRequestMmapAccess._PORT.sock").getAbsolutePath()); 130 conf.setBoolean(DFSConfigKeys. 131 DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true); 132 conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); 133 conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); 134 conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000); 135 return conf; 136 } 137 138 @Test testZeroCopyReads()139 public void testZeroCopyReads() throws Exception { 140 HdfsConfiguration conf = initZeroCopyTest(); 141 MiniDFSCluster cluster = null; 142 final Path TEST_PATH = new Path("/a"); 143 FSDataInputStream fsIn = null; 144 final int TEST_FILE_LENGTH = 3 * BLOCK_SIZE; 145 146 FileSystem fs = null; 147 try { 148 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 149 cluster.waitActive(); 150 fs = cluster.getFileSystem(); 151 DFSTestUtil.createFile(fs, TEST_PATH, 152 TEST_FILE_LENGTH, (short)1, 7567L); 153 try { 154 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 155 } catch (InterruptedException e) { 156 Assert.fail("unexpected InterruptedException during " + 157 "waitReplication: " + e); 158 } catch (TimeoutException e) { 159 Assert.fail("unexpected TimeoutException during " + 160 "waitReplication: " + e); 161 } 162 fsIn = fs.open(TEST_PATH); 163 byte original[] = new byte[TEST_FILE_LENGTH]; 164 IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); 165 fsIn.close(); 166 fsIn = fs.open(TEST_PATH); 167 ByteBuffer result = fsIn.read(null, BLOCK_SIZE, 168 EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 169 Assert.assertEquals(BLOCK_SIZE, result.remaining()); 170 HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; 171 Assert.assertEquals(BLOCK_SIZE, 172 dfsIn.getReadStatistics().getTotalBytesRead()); 173 Assert.assertEquals(BLOCK_SIZE, 174 dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); 175 Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE), 176 byteBufferToArray(result)); 177 fsIn.releaseBuffer(result); 178 } finally { 179 if (fsIn != null) fsIn.close(); 180 if (fs != null) fs.close(); 181 if (cluster != null) cluster.shutdown(); 182 } 183 } 184 185 @Test testShortZeroCopyReads()186 public void testShortZeroCopyReads() throws Exception { 187 HdfsConfiguration conf = initZeroCopyTest(); 188 MiniDFSCluster cluster = null; 189 final Path TEST_PATH = new Path("/a"); 190 FSDataInputStream fsIn = null; 191 final int TEST_FILE_LENGTH = 3 * BLOCK_SIZE; 192 193 FileSystem fs = null; 194 try { 195 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 196 cluster.waitActive(); 197 fs = cluster.getFileSystem(); 198 DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L); 199 try { 200 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 201 } catch (InterruptedException e) { 202 Assert.fail("unexpected InterruptedException during " + 203 "waitReplication: " + e); 204 } catch (TimeoutException e) { 205 Assert.fail("unexpected TimeoutException during " + 206 "waitReplication: " + e); 207 } 208 fsIn = fs.open(TEST_PATH); 209 byte original[] = new byte[TEST_FILE_LENGTH]; 210 IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); 211 fsIn.close(); 212 fsIn = fs.open(TEST_PATH); 213 214 // Try to read (2 * ${BLOCK_SIZE}), but only get ${BLOCK_SIZE} because of the block size. 215 HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; 216 ByteBuffer result = 217 dfsIn.read(null, 2 * BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 218 Assert.assertEquals(BLOCK_SIZE, result.remaining()); 219 Assert.assertEquals(BLOCK_SIZE, 220 dfsIn.getReadStatistics().getTotalBytesRead()); 221 Assert.assertEquals(BLOCK_SIZE, 222 dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); 223 Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE), 224 byteBufferToArray(result)); 225 dfsIn.releaseBuffer(result); 226 227 // Try to read (1 + ${BLOCK_SIZE}), but only get ${BLOCK_SIZE} because of the block size. 228 result = 229 dfsIn.read(null, 1 + BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 230 Assert.assertEquals(BLOCK_SIZE, result.remaining()); 231 Assert.assertArrayEquals(Arrays.copyOfRange(original, BLOCK_SIZE, 2 * BLOCK_SIZE), 232 byteBufferToArray(result)); 233 dfsIn.releaseBuffer(result); 234 } finally { 235 if (fsIn != null) fsIn.close(); 236 if (fs != null) fs.close(); 237 if (cluster != null) cluster.shutdown(); 238 } 239 } 240 241 @Test testZeroCopyReadsNoFallback()242 public void testZeroCopyReadsNoFallback() throws Exception { 243 HdfsConfiguration conf = initZeroCopyTest(); 244 MiniDFSCluster cluster = null; 245 final Path TEST_PATH = new Path("/a"); 246 FSDataInputStream fsIn = null; 247 final int TEST_FILE_LENGTH = 3 * BLOCK_SIZE; 248 249 FileSystem fs = null; 250 try { 251 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 252 cluster.waitActive(); 253 fs = cluster.getFileSystem(); 254 DFSTestUtil.createFile(fs, TEST_PATH, 255 TEST_FILE_LENGTH, (short)1, 7567L); 256 try { 257 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 258 } catch (InterruptedException e) { 259 Assert.fail("unexpected InterruptedException during " + 260 "waitReplication: " + e); 261 } catch (TimeoutException e) { 262 Assert.fail("unexpected TimeoutException during " + 263 "waitReplication: " + e); 264 } 265 fsIn = fs.open(TEST_PATH); 266 byte original[] = new byte[TEST_FILE_LENGTH]; 267 IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); 268 fsIn.close(); 269 fsIn = fs.open(TEST_PATH); 270 HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; 271 ByteBuffer result; 272 try { 273 result = dfsIn.read(null, BLOCK_SIZE + 1, EnumSet.noneOf(ReadOption.class)); 274 Assert.fail("expected UnsupportedOperationException"); 275 } catch (UnsupportedOperationException e) { 276 // expected 277 } 278 result = dfsIn.read(null, BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 279 Assert.assertEquals(BLOCK_SIZE, result.remaining()); 280 Assert.assertEquals(BLOCK_SIZE, 281 dfsIn.getReadStatistics().getTotalBytesRead()); 282 Assert.assertEquals(BLOCK_SIZE, 283 dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); 284 Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE), 285 byteBufferToArray(result)); 286 } finally { 287 if (fsIn != null) fsIn.close(); 288 if (fs != null) fs.close(); 289 if (cluster != null) cluster.shutdown(); 290 } 291 } 292 293 private static class CountingVisitor implements CacheVisitor { 294 private final int expectedNumOutstandingMmaps; 295 private final int expectedNumReplicas; 296 private final int expectedNumEvictable; 297 private final int expectedNumMmapedEvictable; 298 CountingVisitor(int expectedNumOutstandingMmaps, int expectedNumReplicas, int expectedNumEvictable, int expectedNumMmapedEvictable)299 CountingVisitor(int expectedNumOutstandingMmaps, 300 int expectedNumReplicas, int expectedNumEvictable, 301 int expectedNumMmapedEvictable) { 302 this.expectedNumOutstandingMmaps = expectedNumOutstandingMmaps; 303 this.expectedNumReplicas = expectedNumReplicas; 304 this.expectedNumEvictable = expectedNumEvictable; 305 this.expectedNumMmapedEvictable = expectedNumMmapedEvictable; 306 } 307 308 @Override visit(int numOutstandingMmaps, Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped)309 public void visit(int numOutstandingMmaps, 310 Map<ExtendedBlockId, ShortCircuitReplica> replicas, 311 Map<ExtendedBlockId, InvalidToken> failedLoads, 312 Map<Long, ShortCircuitReplica> evictable, 313 Map<Long, ShortCircuitReplica> evictableMmapped) { 314 if (expectedNumOutstandingMmaps >= 0) { 315 Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps); 316 } 317 if (expectedNumReplicas >= 0) { 318 Assert.assertEquals(expectedNumReplicas, replicas.size()); 319 } 320 if (expectedNumEvictable >= 0) { 321 Assert.assertEquals(expectedNumEvictable, evictable.size()); 322 } 323 if (expectedNumMmapedEvictable >= 0) { 324 Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size()); 325 } 326 } 327 } 328 329 @Test testZeroCopyMmapCache()330 public void testZeroCopyMmapCache() throws Exception { 331 HdfsConfiguration conf = initZeroCopyTest(); 332 MiniDFSCluster cluster = null; 333 final Path TEST_PATH = new Path("/a"); 334 final int TEST_FILE_LENGTH = 5 * BLOCK_SIZE; 335 final int RANDOM_SEED = 23453; 336 final String CONTEXT = "testZeroCopyMmapCacheContext"; 337 FSDataInputStream fsIn = null; 338 ByteBuffer results[] = { null, null, null, null }; 339 340 DistributedFileSystem fs = null; 341 conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); 342 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 343 cluster.waitActive(); 344 fs = cluster.getFileSystem(); 345 DFSTestUtil.createFile(fs, TEST_PATH, 346 TEST_FILE_LENGTH, (short)1, RANDOM_SEED); 347 try { 348 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 349 } catch (InterruptedException e) { 350 Assert.fail("unexpected InterruptedException during " + 351 "waitReplication: " + e); 352 } catch (TimeoutException e) { 353 Assert.fail("unexpected TimeoutException during " + 354 "waitReplication: " + e); 355 } 356 fsIn = fs.open(TEST_PATH); 357 byte original[] = new byte[TEST_FILE_LENGTH]; 358 IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); 359 fsIn.close(); 360 fsIn = fs.open(TEST_PATH); 361 final ShortCircuitCache cache = ClientContext.get( 362 CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache(); 363 cache.accept(new CountingVisitor(0, 5, 5, 0)); 364 results[0] = fsIn.read(null, BLOCK_SIZE, 365 EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 366 fsIn.seek(0); 367 results[1] = fsIn.read(null, BLOCK_SIZE, 368 EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 369 370 // The mmap should be of the first block of the file. 371 final ExtendedBlock firstBlock = 372 DFSTestUtil.getFirstBlock(fs, TEST_PATH); 373 cache.accept(new CacheVisitor() { 374 @Override 375 public void visit(int numOutstandingMmaps, 376 Map<ExtendedBlockId, ShortCircuitReplica> replicas, 377 Map<ExtendedBlockId, InvalidToken> failedLoads, 378 Map<Long, ShortCircuitReplica> evictable, 379 Map<Long, ShortCircuitReplica> evictableMmapped) { 380 ShortCircuitReplica replica = replicas.get( 381 new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId())); 382 Assert.assertNotNull(replica); 383 Assert.assertTrue(replica.hasMmap()); 384 // The replica should not yet be evictable, since we have it open. 385 Assert.assertNull(replica.getEvictableTimeNs()); 386 } 387 }); 388 389 // Read more blocks. 390 results[2] = fsIn.read(null, BLOCK_SIZE, 391 EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 392 results[3] = fsIn.read(null, BLOCK_SIZE, 393 EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 394 395 // we should have 3 mmaps, 1 evictable 396 cache.accept(new CountingVisitor(3, 5, 2, 0)); 397 398 // After we close the cursors, the mmaps should be evictable for 399 // a brief period of time. Then, they should be closed (we're 400 // using a very quick timeout) 401 for (ByteBuffer buffer : results) { 402 if (buffer != null) { 403 fsIn.releaseBuffer(buffer); 404 } 405 } 406 fsIn.close(); 407 GenericTestUtils.waitFor(new Supplier<Boolean>() { 408 public Boolean get() { 409 final MutableBoolean finished = new MutableBoolean(false); 410 cache.accept(new CacheVisitor() { 411 @Override 412 public void visit(int numOutstandingMmaps, 413 Map<ExtendedBlockId, ShortCircuitReplica> replicas, 414 Map<ExtendedBlockId, InvalidToken> failedLoads, 415 Map<Long, ShortCircuitReplica> evictable, 416 Map<Long, ShortCircuitReplica> evictableMmapped) { 417 finished.setValue(evictableMmapped.isEmpty()); 418 } 419 }); 420 return finished.booleanValue(); 421 } 422 }, 10, 60000); 423 424 cache.accept(new CountingVisitor(0, -1, -1, -1)); 425 426 fs.close(); 427 cluster.shutdown(); 428 } 429 430 /** 431 * Test HDFS fallback reads. HDFS streams support the ByteBufferReadable 432 * interface. 433 */ 434 @Test testHdfsFallbackReads()435 public void testHdfsFallbackReads() throws Exception { 436 HdfsConfiguration conf = initZeroCopyTest(); 437 MiniDFSCluster cluster = null; 438 final Path TEST_PATH = new Path("/a"); 439 final int TEST_FILE_LENGTH = 16385; 440 final int RANDOM_SEED = 23453; 441 FSDataInputStream fsIn = null; 442 443 DistributedFileSystem fs = null; 444 try { 445 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 446 cluster.waitActive(); 447 fs = cluster.getFileSystem(); 448 DFSTestUtil.createFile(fs, TEST_PATH, 449 TEST_FILE_LENGTH, (short)1, RANDOM_SEED); 450 try { 451 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 452 } catch (InterruptedException e) { 453 Assert.fail("unexpected InterruptedException during " + 454 "waitReplication: " + e); 455 } catch (TimeoutException e) { 456 Assert.fail("unexpected TimeoutException during " + 457 "waitReplication: " + e); 458 } 459 fsIn = fs.open(TEST_PATH); 460 byte original[] = new byte[TEST_FILE_LENGTH]; 461 IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); 462 fsIn.close(); 463 fsIn = fs.open(TEST_PATH); 464 testFallbackImpl(fsIn, original); 465 } finally { 466 if (fsIn != null) fsIn.close(); 467 if (fs != null) fs.close(); 468 if (cluster != null) cluster.shutdown(); 469 } 470 } 471 472 private static class RestrictedAllocatingByteBufferPool 473 implements ByteBufferPool { 474 private final boolean direct; 475 RestrictedAllocatingByteBufferPool(boolean direct)476 RestrictedAllocatingByteBufferPool(boolean direct) { 477 this.direct = direct; 478 } 479 @Override getBuffer(boolean direct, int length)480 public ByteBuffer getBuffer(boolean direct, int length) { 481 Preconditions.checkArgument(this.direct == direct); 482 return direct ? ByteBuffer.allocateDirect(length) : 483 ByteBuffer.allocate(length); 484 } 485 @Override putBuffer(ByteBuffer buffer)486 public void putBuffer(ByteBuffer buffer) { 487 } 488 } 489 testFallbackImpl(InputStream stream, byte original[])490 private static void testFallbackImpl(InputStream stream, 491 byte original[]) throws Exception { 492 RestrictedAllocatingByteBufferPool bufferPool = 493 new RestrictedAllocatingByteBufferPool( 494 stream instanceof ByteBufferReadable); 495 496 ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10); 497 Assert.assertEquals(10, result.remaining()); 498 Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10), 499 byteBufferToArray(result)); 500 501 result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000); 502 Assert.assertEquals(5000, result.remaining()); 503 Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010), 504 byteBufferToArray(result)); 505 506 result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999); 507 Assert.assertEquals(11375, result.remaining()); 508 Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385), 509 byteBufferToArray(result)); 510 511 result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10); 512 Assert.assertNull(result); 513 } 514 515 /** 516 * Test the {@link ByteBufferUtil#fallbackRead} function directly. 517 */ 518 @Test testFallbackRead()519 public void testFallbackRead() throws Exception { 520 HdfsConfiguration conf = initZeroCopyTest(); 521 MiniDFSCluster cluster = null; 522 final Path TEST_PATH = new Path("/a"); 523 final int TEST_FILE_LENGTH = 16385; 524 final int RANDOM_SEED = 23453; 525 FSDataInputStream fsIn = null; 526 527 DistributedFileSystem fs = null; 528 try { 529 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 530 cluster.waitActive(); 531 fs = cluster.getFileSystem(); 532 DFSTestUtil.createFile(fs, TEST_PATH, 533 TEST_FILE_LENGTH, (short)1, RANDOM_SEED); 534 try { 535 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 536 } catch (InterruptedException e) { 537 Assert.fail("unexpected InterruptedException during " + 538 "waitReplication: " + e); 539 } catch (TimeoutException e) { 540 Assert.fail("unexpected TimeoutException during " + 541 "waitReplication: " + e); 542 } 543 fsIn = fs.open(TEST_PATH); 544 byte original[] = new byte[TEST_FILE_LENGTH]; 545 IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); 546 fsIn.close(); 547 fsIn = fs.open(TEST_PATH); 548 testFallbackImpl(fsIn, original); 549 } finally { 550 if (fsIn != null) fsIn.close(); 551 if (fs != null) fs.close(); 552 if (cluster != null) cluster.shutdown(); 553 } 554 } 555 556 /** 557 * Test fallback reads on a stream which does not support the 558 * ByteBufferReadable * interface. 559 */ 560 @Test testIndirectFallbackReads()561 public void testIndirectFallbackReads() throws Exception { 562 final File TEST_DIR = new File( 563 System.getProperty("test.build.data","build/test/data")); 564 final String TEST_PATH = TEST_DIR + File.separator + 565 "indirectFallbackTestFile"; 566 final int TEST_FILE_LENGTH = 16385; 567 final int RANDOM_SEED = 23453; 568 FileOutputStream fos = null; 569 FileInputStream fis = null; 570 try { 571 fos = new FileOutputStream(TEST_PATH); 572 Random random = new Random(RANDOM_SEED); 573 byte original[] = new byte[TEST_FILE_LENGTH]; 574 random.nextBytes(original); 575 fos.write(original); 576 fos.close(); 577 fos = null; 578 fis = new FileInputStream(TEST_PATH); 579 testFallbackImpl(fis, original); 580 } finally { 581 IOUtils.cleanup(LOG, fos, fis); 582 new File(TEST_PATH).delete(); 583 } 584 } 585 586 /** 587 * Test that we can zero-copy read cached data even without disabling 588 * checksums. 589 */ 590 @Test(timeout=120000) testZeroCopyReadOfCachedData()591 public void testZeroCopyReadOfCachedData() throws Exception { 592 BlockReaderTestUtil.enableShortCircuitShmTracing(); 593 BlockReaderTestUtil.enableBlockReaderFactoryTracing(); 594 BlockReaderTestUtil.enableHdfsCachingTracing(); 595 596 final int TEST_FILE_LENGTH = BLOCK_SIZE; 597 final Path TEST_PATH = new Path("/a"); 598 final int RANDOM_SEED = 23453; 599 HdfsConfiguration conf = initZeroCopyTest(); 600 conf.setBoolean(DFSConfigKeys. 601 DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); 602 final String CONTEXT = "testZeroCopyReadOfCachedData"; 603 conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); 604 conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, 605 DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, 606 (int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize())); 607 MiniDFSCluster cluster = null; 608 ByteBuffer result = null, result2 = null; 609 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 610 cluster.waitActive(); 611 FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset(); 612 DistributedFileSystem fs = cluster.getFileSystem(); 613 DFSTestUtil.createFile(fs, TEST_PATH, 614 TEST_FILE_LENGTH, (short)1, RANDOM_SEED); 615 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 616 byte original[] = DFSTestUtil. 617 calculateFileContentsFromSeed(RANDOM_SEED, TEST_FILE_LENGTH); 618 619 // Prior to caching, the file can't be read via zero-copy 620 FSDataInputStream fsIn = fs.open(TEST_PATH); 621 try { 622 result = fsIn.read(null, TEST_FILE_LENGTH / 2, 623 EnumSet.noneOf(ReadOption.class)); 624 Assert.fail("expected UnsupportedOperationException"); 625 } catch (UnsupportedOperationException e) { 626 // expected 627 } 628 // Cache the file 629 fs.addCachePool(new CachePoolInfo("pool1")); 630 long directiveId = fs.addCacheDirective(new CacheDirectiveInfo.Builder(). 631 setPath(TEST_PATH). 632 setReplication((short)1). 633 setPool("pool1"). 634 build()); 635 int numBlocks = (int)Math.ceil((double)TEST_FILE_LENGTH / BLOCK_SIZE); 636 DFSTestUtil.verifyExpectedCacheUsage( 637 DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, BLOCK_SIZE), 638 numBlocks, cluster.getDataNodes().get(0).getFSDataset()); 639 try { 640 result = fsIn.read(null, TEST_FILE_LENGTH, 641 EnumSet.noneOf(ReadOption.class)); 642 } catch (UnsupportedOperationException e) { 643 Assert.fail("expected to be able to read cached file via zero-copy"); 644 } 645 Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 646 BLOCK_SIZE), byteBufferToArray(result)); 647 // Test that files opened after the cache operation has finished 648 // still get the benefits of zero-copy (regression test for HDFS-6086) 649 FSDataInputStream fsIn2 = fs.open(TEST_PATH); 650 try { 651 result2 = fsIn2.read(null, TEST_FILE_LENGTH, 652 EnumSet.noneOf(ReadOption.class)); 653 } catch (UnsupportedOperationException e) { 654 Assert.fail("expected to be able to read cached file via zero-copy"); 655 } 656 Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 657 BLOCK_SIZE), byteBufferToArray(result2)); 658 fsIn2.releaseBuffer(result2); 659 fsIn2.close(); 660 661 // check that the replica is anchored 662 final ExtendedBlock firstBlock = 663 DFSTestUtil.getFirstBlock(fs, TEST_PATH); 664 final ShortCircuitCache cache = ClientContext.get( 665 CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache(); 666 waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1); 667 // Uncache the replica 668 fs.removeCacheDirective(directiveId); 669 waitForReplicaAnchorStatus(cache, firstBlock, false, true, 1); 670 fsIn.releaseBuffer(result); 671 waitForReplicaAnchorStatus(cache, firstBlock, false, false, 1); 672 DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd); 673 674 fsIn.close(); 675 fs.close(); 676 cluster.shutdown(); 677 } 678 waitForReplicaAnchorStatus(final ShortCircuitCache cache, final ExtendedBlock block, final boolean expectedIsAnchorable, final boolean expectedIsAnchored, final int expectedOutstandingMmaps)679 private void waitForReplicaAnchorStatus(final ShortCircuitCache cache, 680 final ExtendedBlock block, final boolean expectedIsAnchorable, 681 final boolean expectedIsAnchored, final int expectedOutstandingMmaps) 682 throws Exception { 683 GenericTestUtils.waitFor(new Supplier<Boolean>() { 684 @Override 685 public Boolean get() { 686 final MutableBoolean result = new MutableBoolean(false); 687 cache.accept(new CacheVisitor() { 688 @Override 689 public void visit(int numOutstandingMmaps, 690 Map<ExtendedBlockId, ShortCircuitReplica> replicas, 691 Map<ExtendedBlockId, InvalidToken> failedLoads, 692 Map<Long, ShortCircuitReplica> evictable, 693 Map<Long, ShortCircuitReplica> evictableMmapped) { 694 Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps); 695 ShortCircuitReplica replica = 696 replicas.get(ExtendedBlockId.fromExtendedBlock(block)); 697 Assert.assertNotNull(replica); 698 Slot slot = replica.getSlot(); 699 if ((expectedIsAnchorable != slot.isAnchorable()) || 700 (expectedIsAnchored != slot.isAnchored())) { 701 LOG.info("replica " + replica + " has isAnchorable = " + 702 slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + 703 ". Waiting for isAnchorable = " + expectedIsAnchorable + 704 ", isAnchored = " + expectedIsAnchored); 705 return; 706 } 707 result.setValue(true); 708 } 709 }); 710 return result.toBoolean(); 711 } 712 }, 10, 60000); 713 } 714 715 @Test testClientMmapDisable()716 public void testClientMmapDisable() throws Exception { 717 HdfsConfiguration conf = initZeroCopyTest(); 718 conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, false); 719 MiniDFSCluster cluster = null; 720 final Path TEST_PATH = new Path("/a"); 721 final int TEST_FILE_LENGTH = 16385; 722 final int RANDOM_SEED = 23453; 723 final String CONTEXT = "testClientMmapDisable"; 724 FSDataInputStream fsIn = null; 725 DistributedFileSystem fs = null; 726 conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); 727 728 try { 729 // With DFS_CLIENT_MMAP_ENABLED set to false, we should not do memory 730 // mapped reads. 731 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 732 cluster.waitActive(); 733 fs = cluster.getFileSystem(); 734 DFSTestUtil.createFile(fs, TEST_PATH, 735 TEST_FILE_LENGTH, (short)1, RANDOM_SEED); 736 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 737 fsIn = fs.open(TEST_PATH); 738 try { 739 fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 740 Assert.fail("expected zero-copy read to fail when client mmaps " + 741 "were disabled."); 742 } catch (UnsupportedOperationException e) { 743 } 744 } finally { 745 if (fsIn != null) fsIn.close(); 746 if (fs != null) fs.close(); 747 if (cluster != null) cluster.shutdown(); 748 } 749 750 fsIn = null; 751 fs = null; 752 cluster = null; 753 try { 754 // Now try again with DFS_CLIENT_MMAP_CACHE_SIZE == 0. It should work. 755 conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, true); 756 conf.setInt(DFS_CLIENT_MMAP_CACHE_SIZE, 0); 757 conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1"); 758 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 759 cluster.waitActive(); 760 fs = cluster.getFileSystem(); 761 DFSTestUtil.createFile(fs, TEST_PATH, 762 TEST_FILE_LENGTH, (short)1, RANDOM_SEED); 763 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 764 fsIn = fs.open(TEST_PATH); 765 ByteBuffer buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 766 fsIn.releaseBuffer(buf); 767 // Test EOF behavior 768 IOUtils.skipFully(fsIn, TEST_FILE_LENGTH - 1); 769 buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 770 Assert.assertEquals(null, buf); 771 } finally { 772 if (fsIn != null) fsIn.close(); 773 if (fs != null) fs.close(); 774 if (cluster != null) cluster.shutdown(); 775 } 776 } 777 778 @Test test2GBMmapLimit()779 public void test2GBMmapLimit() throws Exception { 780 Assume.assumeTrue(BlockReaderTestUtil.shouldTestLargeFiles()); 781 HdfsConfiguration conf = initZeroCopyTest(); 782 final long TEST_FILE_LENGTH = 2469605888L; 783 conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL"); 784 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, TEST_FILE_LENGTH); 785 MiniDFSCluster cluster = null; 786 final Path TEST_PATH = new Path("/a"); 787 final String CONTEXT = "test2GBMmapLimit"; 788 conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); 789 790 FSDataInputStream fsIn = null, fsIn2 = null; 791 ByteBuffer buf1 = null, buf2 = null; 792 try { 793 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); 794 cluster.waitActive(); 795 DistributedFileSystem fs = cluster.getFileSystem(); 796 DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 0xB); 797 DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); 798 799 fsIn = fs.open(TEST_PATH); 800 buf1 = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 801 Assert.assertEquals(1, buf1.remaining()); 802 fsIn.releaseBuffer(buf1); 803 buf1 = null; 804 fsIn.seek(2147483640L); 805 buf1 = fsIn.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 806 Assert.assertEquals(7, buf1.remaining()); 807 Assert.assertEquals(Integer.MAX_VALUE, buf1.limit()); 808 fsIn.releaseBuffer(buf1); 809 buf1 = null; 810 Assert.assertEquals(2147483647L, fsIn.getPos()); 811 try { 812 buf1 = fsIn.read(null, 1024, 813 EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 814 Assert.fail("expected UnsupportedOperationException"); 815 } catch (UnsupportedOperationException e) { 816 // expected; can't read past 2GB boundary. 817 } 818 fsIn.close(); 819 fsIn = null; 820 821 // Now create another file with normal-sized blocks, and verify we 822 // can read past 2GB 823 final Path TEST_PATH2 = new Path("/b"); 824 conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 268435456L); 825 DFSTestUtil.createFile(fs, TEST_PATH2, 1024 * 1024, TEST_FILE_LENGTH, 826 268435456L, (short)1, 0xA); 827 828 fsIn2 = fs.open(TEST_PATH2); 829 fsIn2.seek(2147483640L); 830 buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 831 Assert.assertEquals(8, buf2.remaining()); 832 Assert.assertEquals(2147483648L, fsIn2.getPos()); 833 fsIn2.releaseBuffer(buf2); 834 buf2 = null; 835 buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); 836 Assert.assertEquals(1024, buf2.remaining()); 837 Assert.assertEquals(2147484672L, fsIn2.getPos()); 838 fsIn2.releaseBuffer(buf2); 839 buf2 = null; 840 } finally { 841 if (buf1 != null) { 842 fsIn.releaseBuffer(buf1); 843 } 844 if (buf2 != null) { 845 fsIn2.releaseBuffer(buf2); 846 } 847 IOUtils.cleanup(null, fsIn, fsIn2); 848 if (cluster != null) { 849 cluster.shutdown(); 850 } 851 } 852 } 853 } 854