1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.fs;
19 
20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE;
22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_ENABLED;
23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
24 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
25 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
26 
27 import java.io.File;
28 import java.io.FileInputStream;
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.nio.ByteBuffer;
33 import java.util.Arrays;
34 import java.util.EnumSet;
35 import java.util.Map;
36 import java.util.Random;
37 import java.util.concurrent.TimeoutException;
38 
39 import org.apache.commons.lang.SystemUtils;
40 import org.apache.commons.lang.mutable.MutableBoolean;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.hdfs.BlockReaderTestUtil;
44 import org.apache.hadoop.hdfs.ClientContext;
45 import org.apache.hadoop.hdfs.DFSClient;
46 import org.apache.hadoop.hdfs.DFSConfigKeys;
47 import org.apache.hadoop.hdfs.DFSTestUtil;
48 import org.apache.hadoop.hdfs.DistributedFileSystem;
49 import org.apache.hadoop.hdfs.ExtendedBlockId;
50 import org.apache.hadoop.hdfs.HdfsConfiguration;
51 import org.apache.hadoop.hdfs.MiniDFSCluster;
52 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
53 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
54 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
55 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
56 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
57 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
58 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
59 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
60 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
61 import org.apache.hadoop.io.ByteBufferPool;
62 import org.apache.hadoop.io.IOUtils;
63 import org.apache.hadoop.io.nativeio.NativeIO;
64 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
65 import org.apache.hadoop.net.unix.DomainSocket;
66 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
67 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
68 import org.apache.hadoop.test.GenericTestUtils;
69 import org.junit.AfterClass;
70 import org.junit.Assert;
71 import org.junit.Assume;
72 import org.junit.BeforeClass;
73 import org.junit.Test;
74 
75 import com.google.common.base.Preconditions;
76 import com.google.common.base.Supplier;
77 
78 /**
79  * This class tests if EnhancedByteBufferAccess works correctly.
80  */
81 public class TestEnhancedByteBufferAccess {
82   private static final Log LOG =
83       LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());
84 
85   static private TemporarySocketDirectory sockDir;
86 
87   static private CacheManipulator prevCacheManipulator;
88 
89   @BeforeClass
init()90   public static void init() {
91     sockDir = new TemporarySocketDirectory();
92     DomainSocket.disableBindPathValidation();
93     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
94     NativeIO.POSIX.setCacheManipulator(new CacheManipulator() {
95       @Override
96       public void mlock(String identifier,
97           ByteBuffer mmap, long length) throws IOException {
98         LOG.info("mlocking " + identifier);
99       }
100     });
101   }
102 
103   @AfterClass
teardown()104   public static void teardown() {
105     // Restore the original CacheManipulator
106     NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
107   }
108 
byteBufferToArray(ByteBuffer buf)109   private static byte[] byteBufferToArray(ByteBuffer buf) {
110     byte resultArray[] = new byte[buf.remaining()];
111     buf.get(resultArray);
112     buf.flip();
113     return resultArray;
114   }
115 
116   private static final int BLOCK_SIZE =
117       (int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
118 
initZeroCopyTest()119   public static HdfsConfiguration initZeroCopyTest() {
120     Assume.assumeTrue(NativeIO.isAvailable());
121     Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
122     HdfsConfiguration conf = new HdfsConfiguration();
123     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
124     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
125     conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
126     conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
127     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
128         new File(sockDir.getDir(),
129           "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
130     conf.setBoolean(DFSConfigKeys.
131         DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
132     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
133     conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
134     conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
135     return conf;
136   }
137 
138   @Test
testZeroCopyReads()139   public void testZeroCopyReads() throws Exception {
140     HdfsConfiguration conf = initZeroCopyTest();
141     MiniDFSCluster cluster = null;
142     final Path TEST_PATH = new Path("/a");
143     FSDataInputStream fsIn = null;
144     final int TEST_FILE_LENGTH = 3 * BLOCK_SIZE;
145 
146     FileSystem fs = null;
147     try {
148       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
149       cluster.waitActive();
150       fs = cluster.getFileSystem();
151       DFSTestUtil.createFile(fs, TEST_PATH,
152           TEST_FILE_LENGTH, (short)1, 7567L);
153       try {
154         DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
155       } catch (InterruptedException e) {
156         Assert.fail("unexpected InterruptedException during " +
157             "waitReplication: " + e);
158       } catch (TimeoutException e) {
159         Assert.fail("unexpected TimeoutException during " +
160             "waitReplication: " + e);
161       }
162       fsIn = fs.open(TEST_PATH);
163       byte original[] = new byte[TEST_FILE_LENGTH];
164       IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
165       fsIn.close();
166       fsIn = fs.open(TEST_PATH);
167       ByteBuffer result = fsIn.read(null, BLOCK_SIZE,
168           EnumSet.of(ReadOption.SKIP_CHECKSUMS));
169       Assert.assertEquals(BLOCK_SIZE, result.remaining());
170       HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
171       Assert.assertEquals(BLOCK_SIZE,
172           dfsIn.getReadStatistics().getTotalBytesRead());
173       Assert.assertEquals(BLOCK_SIZE,
174           dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
175       Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
176           byteBufferToArray(result));
177       fsIn.releaseBuffer(result);
178     } finally {
179       if (fsIn != null) fsIn.close();
180       if (fs != null) fs.close();
181       if (cluster != null) cluster.shutdown();
182     }
183   }
184 
185   @Test
testShortZeroCopyReads()186   public void testShortZeroCopyReads() throws Exception {
187     HdfsConfiguration conf = initZeroCopyTest();
188     MiniDFSCluster cluster = null;
189     final Path TEST_PATH = new Path("/a");
190     FSDataInputStream fsIn = null;
191     final int TEST_FILE_LENGTH = 3 * BLOCK_SIZE;
192 
193     FileSystem fs = null;
194     try {
195       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
196       cluster.waitActive();
197       fs = cluster.getFileSystem();
198       DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L);
199       try {
200         DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
201       } catch (InterruptedException e) {
202         Assert.fail("unexpected InterruptedException during " +
203             "waitReplication: " + e);
204       } catch (TimeoutException e) {
205         Assert.fail("unexpected TimeoutException during " +
206             "waitReplication: " + e);
207       }
208       fsIn = fs.open(TEST_PATH);
209       byte original[] = new byte[TEST_FILE_LENGTH];
210       IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
211       fsIn.close();
212       fsIn = fs.open(TEST_PATH);
213 
214       // Try to read (2 * ${BLOCK_SIZE}), but only get ${BLOCK_SIZE} because of the block size.
215       HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
216       ByteBuffer result =
217         dfsIn.read(null, 2 * BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
218       Assert.assertEquals(BLOCK_SIZE, result.remaining());
219       Assert.assertEquals(BLOCK_SIZE,
220           dfsIn.getReadStatistics().getTotalBytesRead());
221       Assert.assertEquals(BLOCK_SIZE,
222           dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
223       Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
224           byteBufferToArray(result));
225       dfsIn.releaseBuffer(result);
226 
227       // Try to read (1 + ${BLOCK_SIZE}), but only get ${BLOCK_SIZE} because of the block size.
228       result =
229           dfsIn.read(null, 1 + BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
230       Assert.assertEquals(BLOCK_SIZE, result.remaining());
231       Assert.assertArrayEquals(Arrays.copyOfRange(original, BLOCK_SIZE, 2 * BLOCK_SIZE),
232           byteBufferToArray(result));
233       dfsIn.releaseBuffer(result);
234     } finally {
235       if (fsIn != null) fsIn.close();
236       if (fs != null) fs.close();
237       if (cluster != null) cluster.shutdown();
238     }
239   }
240 
241   @Test
testZeroCopyReadsNoFallback()242   public void testZeroCopyReadsNoFallback() throws Exception {
243     HdfsConfiguration conf = initZeroCopyTest();
244     MiniDFSCluster cluster = null;
245     final Path TEST_PATH = new Path("/a");
246     FSDataInputStream fsIn = null;
247     final int TEST_FILE_LENGTH = 3 * BLOCK_SIZE;
248 
249     FileSystem fs = null;
250     try {
251       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
252       cluster.waitActive();
253       fs = cluster.getFileSystem();
254       DFSTestUtil.createFile(fs, TEST_PATH,
255           TEST_FILE_LENGTH, (short)1, 7567L);
256       try {
257         DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
258       } catch (InterruptedException e) {
259         Assert.fail("unexpected InterruptedException during " +
260             "waitReplication: " + e);
261       } catch (TimeoutException e) {
262         Assert.fail("unexpected TimeoutException during " +
263             "waitReplication: " + e);
264       }
265       fsIn = fs.open(TEST_PATH);
266       byte original[] = new byte[TEST_FILE_LENGTH];
267       IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
268       fsIn.close();
269       fsIn = fs.open(TEST_PATH);
270       HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
271       ByteBuffer result;
272       try {
273         result = dfsIn.read(null, BLOCK_SIZE + 1, EnumSet.noneOf(ReadOption.class));
274         Assert.fail("expected UnsupportedOperationException");
275       } catch (UnsupportedOperationException e) {
276         // expected
277       }
278       result = dfsIn.read(null, BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
279       Assert.assertEquals(BLOCK_SIZE, result.remaining());
280       Assert.assertEquals(BLOCK_SIZE,
281           dfsIn.getReadStatistics().getTotalBytesRead());
282       Assert.assertEquals(BLOCK_SIZE,
283           dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
284       Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, BLOCK_SIZE),
285           byteBufferToArray(result));
286     } finally {
287       if (fsIn != null) fsIn.close();
288       if (fs != null) fs.close();
289       if (cluster != null) cluster.shutdown();
290     }
291   }
292 
293   private static class CountingVisitor implements CacheVisitor {
294     private final int expectedNumOutstandingMmaps;
295     private final int expectedNumReplicas;
296     private final int expectedNumEvictable;
297     private final int expectedNumMmapedEvictable;
298 
CountingVisitor(int expectedNumOutstandingMmaps, int expectedNumReplicas, int expectedNumEvictable, int expectedNumMmapedEvictable)299     CountingVisitor(int expectedNumOutstandingMmaps,
300         int expectedNumReplicas, int expectedNumEvictable,
301         int expectedNumMmapedEvictable) {
302       this.expectedNumOutstandingMmaps = expectedNumOutstandingMmaps;
303       this.expectedNumReplicas = expectedNumReplicas;
304       this.expectedNumEvictable = expectedNumEvictable;
305       this.expectedNumMmapedEvictable = expectedNumMmapedEvictable;
306     }
307 
308     @Override
visit(int numOutstandingMmaps, Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, InvalidToken> failedLoads, Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictableMmapped)309     public void visit(int numOutstandingMmaps,
310         Map<ExtendedBlockId, ShortCircuitReplica> replicas,
311         Map<ExtendedBlockId, InvalidToken> failedLoads,
312         Map<Long, ShortCircuitReplica> evictable,
313         Map<Long, ShortCircuitReplica> evictableMmapped) {
314       if (expectedNumOutstandingMmaps >= 0) {
315         Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
316       }
317       if (expectedNumReplicas >= 0) {
318         Assert.assertEquals(expectedNumReplicas, replicas.size());
319       }
320       if (expectedNumEvictable >= 0) {
321         Assert.assertEquals(expectedNumEvictable, evictable.size());
322       }
323       if (expectedNumMmapedEvictable >= 0) {
324         Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size());
325       }
326     }
327   }
328 
329   @Test
testZeroCopyMmapCache()330   public void testZeroCopyMmapCache() throws Exception {
331     HdfsConfiguration conf = initZeroCopyTest();
332     MiniDFSCluster cluster = null;
333     final Path TEST_PATH = new Path("/a");
334     final int TEST_FILE_LENGTH = 5 * BLOCK_SIZE;
335     final int RANDOM_SEED = 23453;
336     final String CONTEXT = "testZeroCopyMmapCacheContext";
337     FSDataInputStream fsIn = null;
338     ByteBuffer results[] = { null, null, null, null };
339 
340     DistributedFileSystem fs = null;
341     conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
342     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
343     cluster.waitActive();
344     fs = cluster.getFileSystem();
345     DFSTestUtil.createFile(fs, TEST_PATH,
346         TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
347     try {
348       DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
349     } catch (InterruptedException e) {
350       Assert.fail("unexpected InterruptedException during " +
351           "waitReplication: " + e);
352     } catch (TimeoutException e) {
353       Assert.fail("unexpected TimeoutException during " +
354           "waitReplication: " + e);
355     }
356     fsIn = fs.open(TEST_PATH);
357     byte original[] = new byte[TEST_FILE_LENGTH];
358     IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
359     fsIn.close();
360     fsIn = fs.open(TEST_PATH);
361     final ShortCircuitCache cache = ClientContext.get(
362         CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache();
363     cache.accept(new CountingVisitor(0, 5, 5, 0));
364     results[0] = fsIn.read(null, BLOCK_SIZE,
365         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
366     fsIn.seek(0);
367     results[1] = fsIn.read(null, BLOCK_SIZE,
368         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
369 
370     // The mmap should be of the first block of the file.
371     final ExtendedBlock firstBlock =
372         DFSTestUtil.getFirstBlock(fs, TEST_PATH);
373     cache.accept(new CacheVisitor() {
374       @Override
375       public void visit(int numOutstandingMmaps,
376           Map<ExtendedBlockId, ShortCircuitReplica> replicas,
377           Map<ExtendedBlockId, InvalidToken> failedLoads,
378           Map<Long, ShortCircuitReplica> evictable,
379           Map<Long, ShortCircuitReplica> evictableMmapped) {
380         ShortCircuitReplica replica = replicas.get(
381             new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId()));
382         Assert.assertNotNull(replica);
383         Assert.assertTrue(replica.hasMmap());
384         // The replica should not yet be evictable, since we have it open.
385         Assert.assertNull(replica.getEvictableTimeNs());
386       }
387     });
388 
389     // Read more blocks.
390     results[2] = fsIn.read(null, BLOCK_SIZE,
391         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
392     results[3] = fsIn.read(null, BLOCK_SIZE,
393         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
394 
395     // we should have 3 mmaps, 1 evictable
396     cache.accept(new CountingVisitor(3, 5, 2, 0));
397 
398     // After we close the cursors, the mmaps should be evictable for
399     // a brief period of time.  Then, they should be closed (we're
400     // using a very quick timeout)
401     for (ByteBuffer buffer : results) {
402       if (buffer != null) {
403         fsIn.releaseBuffer(buffer);
404       }
405     }
406     fsIn.close();
407     GenericTestUtils.waitFor(new Supplier<Boolean>() {
408       public Boolean get() {
409         final MutableBoolean finished = new MutableBoolean(false);
410         cache.accept(new CacheVisitor() {
411           @Override
412           public void visit(int numOutstandingMmaps,
413               Map<ExtendedBlockId, ShortCircuitReplica> replicas,
414               Map<ExtendedBlockId, InvalidToken> failedLoads,
415               Map<Long, ShortCircuitReplica> evictable,
416               Map<Long, ShortCircuitReplica> evictableMmapped) {
417             finished.setValue(evictableMmapped.isEmpty());
418           }
419         });
420         return finished.booleanValue();
421       }
422     }, 10, 60000);
423 
424     cache.accept(new CountingVisitor(0, -1, -1, -1));
425 
426     fs.close();
427     cluster.shutdown();
428   }
429 
430   /**
431    * Test HDFS fallback reads.  HDFS streams support the ByteBufferReadable
432    * interface.
433    */
434   @Test
testHdfsFallbackReads()435   public void testHdfsFallbackReads() throws Exception {
436     HdfsConfiguration conf = initZeroCopyTest();
437     MiniDFSCluster cluster = null;
438     final Path TEST_PATH = new Path("/a");
439     final int TEST_FILE_LENGTH = 16385;
440     final int RANDOM_SEED = 23453;
441     FSDataInputStream fsIn = null;
442 
443     DistributedFileSystem fs = null;
444     try {
445       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
446       cluster.waitActive();
447       fs = cluster.getFileSystem();
448       DFSTestUtil.createFile(fs, TEST_PATH,
449           TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
450       try {
451         DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
452       } catch (InterruptedException e) {
453         Assert.fail("unexpected InterruptedException during " +
454             "waitReplication: " + e);
455       } catch (TimeoutException e) {
456         Assert.fail("unexpected TimeoutException during " +
457             "waitReplication: " + e);
458       }
459       fsIn = fs.open(TEST_PATH);
460       byte original[] = new byte[TEST_FILE_LENGTH];
461       IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
462       fsIn.close();
463       fsIn = fs.open(TEST_PATH);
464       testFallbackImpl(fsIn, original);
465     } finally {
466       if (fsIn != null) fsIn.close();
467       if (fs != null) fs.close();
468       if (cluster != null) cluster.shutdown();
469     }
470   }
471 
472   private static class RestrictedAllocatingByteBufferPool
473       implements ByteBufferPool {
474     private final boolean direct;
475 
RestrictedAllocatingByteBufferPool(boolean direct)476     RestrictedAllocatingByteBufferPool(boolean direct) {
477       this.direct = direct;
478     }
479     @Override
getBuffer(boolean direct, int length)480     public ByteBuffer getBuffer(boolean direct, int length) {
481       Preconditions.checkArgument(this.direct == direct);
482       return direct ? ByteBuffer.allocateDirect(length) :
483         ByteBuffer.allocate(length);
484     }
485     @Override
putBuffer(ByteBuffer buffer)486     public void putBuffer(ByteBuffer buffer) {
487     }
488   }
489 
testFallbackImpl(InputStream stream, byte original[])490   private static void testFallbackImpl(InputStream stream,
491       byte original[]) throws Exception {
492     RestrictedAllocatingByteBufferPool bufferPool =
493         new RestrictedAllocatingByteBufferPool(
494             stream instanceof ByteBufferReadable);
495 
496     ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
497     Assert.assertEquals(10, result.remaining());
498     Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
499         byteBufferToArray(result));
500 
501     result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
502     Assert.assertEquals(5000, result.remaining());
503     Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
504         byteBufferToArray(result));
505 
506     result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
507     Assert.assertEquals(11375, result.remaining());
508     Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
509         byteBufferToArray(result));
510 
511     result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
512     Assert.assertNull(result);
513   }
514 
515   /**
516    * Test the {@link ByteBufferUtil#fallbackRead} function directly.
517    */
518   @Test
testFallbackRead()519   public void testFallbackRead() throws Exception {
520     HdfsConfiguration conf = initZeroCopyTest();
521     MiniDFSCluster cluster = null;
522     final Path TEST_PATH = new Path("/a");
523     final int TEST_FILE_LENGTH = 16385;
524     final int RANDOM_SEED = 23453;
525     FSDataInputStream fsIn = null;
526 
527     DistributedFileSystem fs = null;
528     try {
529       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
530       cluster.waitActive();
531       fs = cluster.getFileSystem();
532       DFSTestUtil.createFile(fs, TEST_PATH,
533           TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
534       try {
535         DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
536       } catch (InterruptedException e) {
537         Assert.fail("unexpected InterruptedException during " +
538             "waitReplication: " + e);
539       } catch (TimeoutException e) {
540         Assert.fail("unexpected TimeoutException during " +
541             "waitReplication: " + e);
542       }
543       fsIn = fs.open(TEST_PATH);
544       byte original[] = new byte[TEST_FILE_LENGTH];
545       IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
546       fsIn.close();
547       fsIn = fs.open(TEST_PATH);
548       testFallbackImpl(fsIn, original);
549     } finally {
550       if (fsIn != null) fsIn.close();
551       if (fs != null) fs.close();
552       if (cluster != null) cluster.shutdown();
553     }
554   }
555 
556   /**
557    * Test fallback reads on a stream which does not support the
558    * ByteBufferReadable * interface.
559    */
560   @Test
testIndirectFallbackReads()561   public void testIndirectFallbackReads() throws Exception {
562     final File TEST_DIR = new File(
563       System.getProperty("test.build.data","build/test/data"));
564     final String TEST_PATH = TEST_DIR + File.separator +
565         "indirectFallbackTestFile";
566     final int TEST_FILE_LENGTH = 16385;
567     final int RANDOM_SEED = 23453;
568     FileOutputStream fos = null;
569     FileInputStream fis = null;
570     try {
571       fos = new FileOutputStream(TEST_PATH);
572       Random random = new Random(RANDOM_SEED);
573       byte original[] = new byte[TEST_FILE_LENGTH];
574       random.nextBytes(original);
575       fos.write(original);
576       fos.close();
577       fos = null;
578       fis = new FileInputStream(TEST_PATH);
579       testFallbackImpl(fis, original);
580     } finally {
581       IOUtils.cleanup(LOG, fos, fis);
582       new File(TEST_PATH).delete();
583     }
584   }
585 
586   /**
587    * Test that we can zero-copy read cached data even without disabling
588    * checksums.
589    */
590   @Test(timeout=120000)
testZeroCopyReadOfCachedData()591   public void testZeroCopyReadOfCachedData() throws Exception {
592     BlockReaderTestUtil.enableShortCircuitShmTracing();
593     BlockReaderTestUtil.enableBlockReaderFactoryTracing();
594     BlockReaderTestUtil.enableHdfsCachingTracing();
595 
596     final int TEST_FILE_LENGTH = BLOCK_SIZE;
597     final Path TEST_PATH = new Path("/a");
598     final int RANDOM_SEED = 23453;
599     HdfsConfiguration conf = initZeroCopyTest();
600     conf.setBoolean(DFSConfigKeys.
601         DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
602     final String CONTEXT = "testZeroCopyReadOfCachedData";
603     conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
604     conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
605         DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH,
606           (int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize()));
607     MiniDFSCluster cluster = null;
608     ByteBuffer result = null, result2 = null;
609     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
610     cluster.waitActive();
611     FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
612     DistributedFileSystem fs = cluster.getFileSystem();
613     DFSTestUtil.createFile(fs, TEST_PATH,
614         TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
615     DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
616     byte original[] = DFSTestUtil.
617         calculateFileContentsFromSeed(RANDOM_SEED, TEST_FILE_LENGTH);
618 
619     // Prior to caching, the file can't be read via zero-copy
620     FSDataInputStream fsIn = fs.open(TEST_PATH);
621     try {
622       result = fsIn.read(null, TEST_FILE_LENGTH / 2,
623           EnumSet.noneOf(ReadOption.class));
624       Assert.fail("expected UnsupportedOperationException");
625     } catch (UnsupportedOperationException e) {
626       // expected
627     }
628     // Cache the file
629     fs.addCachePool(new CachePoolInfo("pool1"));
630     long directiveId = fs.addCacheDirective(new CacheDirectiveInfo.Builder().
631         setPath(TEST_PATH).
632         setReplication((short)1).
633         setPool("pool1").
634         build());
635     int numBlocks = (int)Math.ceil((double)TEST_FILE_LENGTH / BLOCK_SIZE);
636     DFSTestUtil.verifyExpectedCacheUsage(
637         DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, BLOCK_SIZE),
638         numBlocks, cluster.getDataNodes().get(0).getFSDataset());
639     try {
640       result = fsIn.read(null, TEST_FILE_LENGTH,
641           EnumSet.noneOf(ReadOption.class));
642     } catch (UnsupportedOperationException e) {
643       Assert.fail("expected to be able to read cached file via zero-copy");
644     }
645     Assert.assertArrayEquals(Arrays.copyOfRange(original, 0,
646         BLOCK_SIZE), byteBufferToArray(result));
647     // Test that files opened after the cache operation has finished
648     // still get the benefits of zero-copy (regression test for HDFS-6086)
649     FSDataInputStream fsIn2 = fs.open(TEST_PATH);
650     try {
651       result2 = fsIn2.read(null, TEST_FILE_LENGTH,
652           EnumSet.noneOf(ReadOption.class));
653     } catch (UnsupportedOperationException e) {
654       Assert.fail("expected to be able to read cached file via zero-copy");
655     }
656     Assert.assertArrayEquals(Arrays.copyOfRange(original, 0,
657         BLOCK_SIZE), byteBufferToArray(result2));
658     fsIn2.releaseBuffer(result2);
659     fsIn2.close();
660 
661     // check that the replica is anchored
662     final ExtendedBlock firstBlock =
663         DFSTestUtil.getFirstBlock(fs, TEST_PATH);
664     final ShortCircuitCache cache = ClientContext.get(
665         CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache();
666     waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
667     // Uncache the replica
668     fs.removeCacheDirective(directiveId);
669     waitForReplicaAnchorStatus(cache, firstBlock, false, true, 1);
670     fsIn.releaseBuffer(result);
671     waitForReplicaAnchorStatus(cache, firstBlock, false, false, 1);
672     DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
673 
674     fsIn.close();
675     fs.close();
676     cluster.shutdown();
677   }
678 
waitForReplicaAnchorStatus(final ShortCircuitCache cache, final ExtendedBlock block, final boolean expectedIsAnchorable, final boolean expectedIsAnchored, final int expectedOutstandingMmaps)679   private void waitForReplicaAnchorStatus(final ShortCircuitCache cache,
680       final ExtendedBlock block, final boolean expectedIsAnchorable,
681         final boolean expectedIsAnchored, final int expectedOutstandingMmaps)
682           throws Exception {
683     GenericTestUtils.waitFor(new Supplier<Boolean>() {
684       @Override
685       public Boolean get() {
686         final MutableBoolean result = new MutableBoolean(false);
687         cache.accept(new CacheVisitor() {
688           @Override
689           public void visit(int numOutstandingMmaps,
690               Map<ExtendedBlockId, ShortCircuitReplica> replicas,
691               Map<ExtendedBlockId, InvalidToken> failedLoads,
692               Map<Long, ShortCircuitReplica> evictable,
693               Map<Long, ShortCircuitReplica> evictableMmapped) {
694             Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
695             ShortCircuitReplica replica =
696                 replicas.get(ExtendedBlockId.fromExtendedBlock(block));
697             Assert.assertNotNull(replica);
698             Slot slot = replica.getSlot();
699             if ((expectedIsAnchorable != slot.isAnchorable()) ||
700                 (expectedIsAnchored != slot.isAnchored())) {
701               LOG.info("replica " + replica + " has isAnchorable = " +
702                 slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() +
703                 ".  Waiting for isAnchorable = " + expectedIsAnchorable +
704                 ", isAnchored = " + expectedIsAnchored);
705               return;
706             }
707             result.setValue(true);
708           }
709         });
710         return result.toBoolean();
711       }
712     }, 10, 60000);
713   }
714 
715   @Test
testClientMmapDisable()716   public void testClientMmapDisable() throws Exception {
717     HdfsConfiguration conf = initZeroCopyTest();
718     conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, false);
719     MiniDFSCluster cluster = null;
720     final Path TEST_PATH = new Path("/a");
721     final int TEST_FILE_LENGTH = 16385;
722     final int RANDOM_SEED = 23453;
723     final String CONTEXT = "testClientMmapDisable";
724     FSDataInputStream fsIn = null;
725     DistributedFileSystem fs = null;
726     conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
727 
728     try {
729       // With DFS_CLIENT_MMAP_ENABLED set to false, we should not do memory
730       // mapped reads.
731       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
732       cluster.waitActive();
733       fs = cluster.getFileSystem();
734       DFSTestUtil.createFile(fs, TEST_PATH,
735           TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
736       DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
737       fsIn = fs.open(TEST_PATH);
738       try {
739         fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
740         Assert.fail("expected zero-copy read to fail when client mmaps " +
741             "were disabled.");
742       } catch (UnsupportedOperationException e) {
743       }
744     } finally {
745       if (fsIn != null) fsIn.close();
746       if (fs != null) fs.close();
747       if (cluster != null) cluster.shutdown();
748     }
749 
750     fsIn = null;
751     fs = null;
752     cluster = null;
753     try {
754       // Now try again with DFS_CLIENT_MMAP_CACHE_SIZE == 0.  It should work.
755       conf.setBoolean(DFS_CLIENT_MMAP_ENABLED, true);
756       conf.setInt(DFS_CLIENT_MMAP_CACHE_SIZE, 0);
757       conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT + ".1");
758       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
759       cluster.waitActive();
760       fs = cluster.getFileSystem();
761       DFSTestUtil.createFile(fs, TEST_PATH,
762           TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
763       DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
764       fsIn = fs.open(TEST_PATH);
765       ByteBuffer buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
766       fsIn.releaseBuffer(buf);
767       // Test EOF behavior
768       IOUtils.skipFully(fsIn, TEST_FILE_LENGTH - 1);
769       buf = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
770       Assert.assertEquals(null, buf);
771     } finally {
772       if (fsIn != null) fsIn.close();
773       if (fs != null) fs.close();
774       if (cluster != null) cluster.shutdown();
775     }
776   }
777 
778   @Test
test2GBMmapLimit()779   public void test2GBMmapLimit() throws Exception {
780     Assume.assumeTrue(BlockReaderTestUtil.shouldTestLargeFiles());
781     HdfsConfiguration conf = initZeroCopyTest();
782     final long TEST_FILE_LENGTH = 2469605888L;
783     conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
784     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, TEST_FILE_LENGTH);
785     MiniDFSCluster cluster = null;
786     final Path TEST_PATH = new Path("/a");
787     final String CONTEXT = "test2GBMmapLimit";
788     conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
789 
790     FSDataInputStream fsIn = null, fsIn2 = null;
791     ByteBuffer buf1 = null, buf2 = null;
792     try {
793       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
794       cluster.waitActive();
795       DistributedFileSystem fs = cluster.getFileSystem();
796       DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 0xB);
797       DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
798 
799       fsIn = fs.open(TEST_PATH);
800       buf1 = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
801       Assert.assertEquals(1, buf1.remaining());
802       fsIn.releaseBuffer(buf1);
803       buf1 = null;
804       fsIn.seek(2147483640L);
805       buf1 = fsIn.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
806       Assert.assertEquals(7, buf1.remaining());
807       Assert.assertEquals(Integer.MAX_VALUE, buf1.limit());
808       fsIn.releaseBuffer(buf1);
809       buf1 = null;
810       Assert.assertEquals(2147483647L, fsIn.getPos());
811       try {
812         buf1 = fsIn.read(null, 1024,
813             EnumSet.of(ReadOption.SKIP_CHECKSUMS));
814         Assert.fail("expected UnsupportedOperationException");
815       } catch (UnsupportedOperationException e) {
816         // expected; can't read past 2GB boundary.
817       }
818       fsIn.close();
819       fsIn = null;
820 
821       // Now create another file with normal-sized blocks, and verify we
822       // can read past 2GB
823       final Path TEST_PATH2 = new Path("/b");
824       conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 268435456L);
825       DFSTestUtil.createFile(fs, TEST_PATH2, 1024 * 1024, TEST_FILE_LENGTH,
826           268435456L, (short)1, 0xA);
827 
828       fsIn2 = fs.open(TEST_PATH2);
829       fsIn2.seek(2147483640L);
830       buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
831       Assert.assertEquals(8, buf2.remaining());
832       Assert.assertEquals(2147483648L, fsIn2.getPos());
833       fsIn2.releaseBuffer(buf2);
834       buf2 = null;
835       buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
836       Assert.assertEquals(1024, buf2.remaining());
837       Assert.assertEquals(2147484672L, fsIn2.getPos());
838       fsIn2.releaseBuffer(buf2);
839       buf2 = null;
840     } finally {
841       if (buf1 != null) {
842         fsIn.releaseBuffer(buf1);
843       }
844       if (buf2 != null) {
845         fsIn2.releaseBuffer(buf2);
846       }
847       IOUtils.cleanup(null, fsIn, fsIn2);
848       if (cluster != null) {
849         cluster.shutdown();
850       }
851     }
852   }
853 }
854