1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.util.EnumSet;
24 
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.fs.ReadOption;
28 import org.apache.hadoop.fs.StorageType;
29 import org.apache.hadoop.hdfs.DFSClient.Conf;
30 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
31 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
32 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
33 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
34 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
35 import org.apache.hadoop.util.DirectBufferPool;
36 import org.apache.hadoop.util.DataChecksum;
37 import org.apache.htrace.Sampler;
38 import org.apache.htrace.Trace;
39 import org.apache.htrace.TraceScope;
40 
41 import com.google.common.annotations.VisibleForTesting;
42 import com.google.common.base.Preconditions;
43 
44 /**
45  * BlockReaderLocal enables local short circuited reads. If the DFS client is on
46  * the same machine as the datanode, then the client can read files directly
47  * from the local file system rather than going through the datanode for better
48  * performance. <br>
49  * {@link BlockReaderLocal} works as follows:
50  * <ul>
51  * <li>The client performing short circuit reads must be configured at the
52  * datanode.</li>
53  * <li>The client gets the file descriptors for the metadata file and the data
54  * file for the block using
55  * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
56  * </li>
57  * <li>The client reads the file descriptors.</li>
58  * </ul>
59  */
60 class BlockReaderLocal implements BlockReader {
61   static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
62 
63   private static final DirectBufferPool bufferPool = new DirectBufferPool();
64 
65   public static class Builder {
66     private final int bufferSize;
67     private boolean verifyChecksum;
68     private int maxReadahead;
69     private String filename;
70     private ShortCircuitReplica replica;
71     private long dataPos;
72     private ExtendedBlock block;
73     private StorageType storageType;
74 
Builder(Conf conf)75     public Builder(Conf conf) {
76       this.maxReadahead = Integer.MAX_VALUE;
77       this.verifyChecksum = !conf.skipShortCircuitChecksums;
78       this.bufferSize = conf.shortCircuitBufferSize;
79     }
80 
setVerifyChecksum(boolean verifyChecksum)81     public Builder setVerifyChecksum(boolean verifyChecksum) {
82       this.verifyChecksum = verifyChecksum;
83       return this;
84     }
85 
setCachingStrategy(CachingStrategy cachingStrategy)86     public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
87       long readahead = cachingStrategy.getReadahead() != null ?
88           cachingStrategy.getReadahead() :
89               DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
90       this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
91       return this;
92     }
93 
setFilename(String filename)94     public Builder setFilename(String filename) {
95       this.filename = filename;
96       return this;
97     }
98 
setShortCircuitReplica(ShortCircuitReplica replica)99     public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
100       this.replica = replica;
101       return this;
102     }
103 
setStartOffset(long startOffset)104     public Builder setStartOffset(long startOffset) {
105       this.dataPos = Math.max(0, startOffset);
106       return this;
107     }
108 
setBlock(ExtendedBlock block)109     public Builder setBlock(ExtendedBlock block) {
110       this.block = block;
111       return this;
112     }
113 
setStorageType(StorageType storageType)114     public Builder setStorageType(StorageType storageType) {
115       this.storageType = storageType;
116       return this;
117     }
118 
build()119     public BlockReaderLocal build() {
120       Preconditions.checkNotNull(replica);
121       return new BlockReaderLocal(this);
122     }
123   }
124 
125   private boolean closed = false;
126 
127   /**
128    * Pair of streams for this block.
129    */
130   private final ShortCircuitReplica replica;
131 
132   /**
133    * The data FileChannel.
134    */
135   private final FileChannel dataIn;
136 
137   /**
138    * The next place we'll read from in the block data FileChannel.
139    *
140    * If data is buffered in dataBuf, this offset will be larger than the
141    * offset of the next byte which a read() operation will give us.
142    */
143   private long dataPos;
144 
145   /**
146    * The Checksum FileChannel.
147    */
148   private final FileChannel checksumIn;
149 
150   /**
151    * Checksum type and size.
152    */
153   private final DataChecksum checksum;
154 
155   /**
156    * If false, we will always skip the checksum.
157    */
158   private final boolean verifyChecksum;
159 
160   /**
161    * Name of the block, for logging purposes.
162    */
163   private final String filename;
164 
165   /**
166    * Block ID and Block Pool ID.
167    */
168   private final ExtendedBlock block;
169 
170   /**
171    * Cache of Checksum#bytesPerChecksum.
172    */
173   private final int bytesPerChecksum;
174 
175   /**
176    * Cache of Checksum#checksumSize.
177    */
178   private final int checksumSize;
179 
180   /**
181    * Maximum number of chunks to allocate.
182    *
183    * This is used to allocate dataBuf and checksumBuf, in the event that
184    * we need them.
185    */
186   private final int maxAllocatedChunks;
187 
188   /**
189    * True if zero readahead was requested.
190    */
191   private final boolean zeroReadaheadRequested;
192 
193   /**
194    * Maximum amount of readahead we'll do.  This will always be at least the,
195    * size of a single chunk, even if {@link #zeroReadaheadRequested} is true.
196    * The reason is because we need to do a certain amount of buffering in order
197    * to do checksumming.
198    *
199    * This determines how many bytes we'll use out of dataBuf and checksumBuf.
200    * Why do we allocate buffers, and then (potentially) only use part of them?
201    * The rationale is that allocating a lot of buffers of different sizes would
202    * make it very difficult for the DirectBufferPool to re-use buffers.
203    */
204   private final int maxReadaheadLength;
205 
206   /**
207    * Buffers data starting at the current dataPos and extending on
208    * for dataBuf.limit().
209    *
210    * This may be null if we don't need it.
211    */
212   private ByteBuffer dataBuf;
213 
214   /**
215    * Buffers checksums starting at the current checksumPos and extending on
216    * for checksumBuf.limit().
217    *
218    * This may be null if we don't need it.
219    */
220   private ByteBuffer checksumBuf;
221 
222   /**
223    * StorageType of replica on DataNode.
224    */
225   private StorageType storageType;
226 
BlockReaderLocal(Builder builder)227   private BlockReaderLocal(Builder builder) {
228     this.replica = builder.replica;
229     this.dataIn = replica.getDataStream().getChannel();
230     this.dataPos = builder.dataPos;
231     this.checksumIn = replica.getMetaStream().getChannel();
232     BlockMetadataHeader header = builder.replica.getMetaHeader();
233     this.checksum = header.getChecksum();
234     this.verifyChecksum = builder.verifyChecksum &&
235         (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
236     this.filename = builder.filename;
237     this.block = builder.block;
238     this.bytesPerChecksum = checksum.getBytesPerChecksum();
239     this.checksumSize = checksum.getChecksumSize();
240 
241     this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
242         ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
243     // Calculate the effective maximum readahead.
244     // We can't do more readahead than there is space in the buffer.
245     int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
246         ((Math.min(builder.bufferSize, builder.maxReadahead) +
247             bytesPerChecksum - 1) / bytesPerChecksum);
248     if (maxReadaheadChunks == 0) {
249       this.zeroReadaheadRequested = true;
250       maxReadaheadChunks = 1;
251     } else {
252       this.zeroReadaheadRequested = false;
253     }
254     this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
255     this.storageType = builder.storageType;
256   }
257 
createDataBufIfNeeded()258   private synchronized void createDataBufIfNeeded() {
259     if (dataBuf == null) {
260       dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
261       dataBuf.position(0);
262       dataBuf.limit(0);
263     }
264   }
265 
freeDataBufIfExists()266   private synchronized void freeDataBufIfExists() {
267     if (dataBuf != null) {
268       // When disposing of a dataBuf, we have to move our stored file index
269       // backwards.
270       dataPos -= dataBuf.remaining();
271       dataBuf.clear();
272       bufferPool.returnBuffer(dataBuf);
273       dataBuf = null;
274     }
275   }
276 
createChecksumBufIfNeeded()277   private synchronized void createChecksumBufIfNeeded() {
278     if (checksumBuf == null) {
279       checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
280       checksumBuf.position(0);
281       checksumBuf.limit(0);
282     }
283   }
284 
freeChecksumBufIfExists()285   private synchronized void freeChecksumBufIfExists() {
286     if (checksumBuf != null) {
287       checksumBuf.clear();
288       bufferPool.returnBuffer(checksumBuf);
289       checksumBuf = null;
290     }
291   }
292 
drainDataBuf(ByteBuffer buf)293   private synchronized int drainDataBuf(ByteBuffer buf) {
294     if (dataBuf == null) return -1;
295     int oldLimit = dataBuf.limit();
296     int nRead = Math.min(dataBuf.remaining(), buf.remaining());
297     if (nRead == 0) {
298       return (dataBuf.remaining() == 0) ? -1 : 0;
299     }
300     try {
301       dataBuf.limit(dataBuf.position() + nRead);
302       buf.put(dataBuf);
303     } finally {
304       dataBuf.limit(oldLimit);
305     }
306     return nRead;
307   }
308 
309   /**
310    * Read from the block file into a buffer.
311    *
312    * This function overwrites checksumBuf.  It will increment dataPos.
313    *
314    * @param buf   The buffer to read into.  May be dataBuf.
315    *              The position and limit of this buffer should be set to
316    *              multiples of the checksum size.
317    * @param canSkipChecksum  True if we can skip checksumming.
318    *
319    * @return      Total bytes read.  0 on EOF.
320    */
fillBuffer(ByteBuffer buf, boolean canSkipChecksum)321   private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
322       throws IOException {
323     TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" +
324         block.getBlockId() + ")", Sampler.NEVER);
325     try {
326       int total = 0;
327       long startDataPos = dataPos;
328       int startBufPos = buf.position();
329       while (buf.hasRemaining()) {
330         int nRead = dataIn.read(buf, dataPos);
331         if (nRead < 0) {
332           break;
333         }
334         dataPos += nRead;
335         total += nRead;
336       }
337       if (canSkipChecksum) {
338         freeChecksumBufIfExists();
339         return total;
340       }
341       if (total > 0) {
342         try {
343           buf.limit(buf.position());
344           buf.position(startBufPos);
345           createChecksumBufIfNeeded();
346           int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
347           checksumBuf.clear();
348           checksumBuf.limit(checksumsNeeded * checksumSize);
349           long checksumPos = BlockMetadataHeader.getHeaderSize()
350               + ((startDataPos / bytesPerChecksum) * checksumSize);
351           while (checksumBuf.hasRemaining()) {
352             int nRead = checksumIn.read(checksumBuf, checksumPos);
353             if (nRead < 0) {
354               throw new IOException("Got unexpected checksum file EOF at " +
355                   checksumPos + ", block file position " + startDataPos + " for " +
356                   "block " + block + " of file " + filename);
357             }
358             checksumPos += nRead;
359           }
360           checksumBuf.flip();
361 
362           checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
363         } finally {
364           buf.position(buf.limit());
365         }
366       }
367       return total;
368     } finally {
369       scope.close();
370     }
371   }
372 
createNoChecksumContext()373   private boolean createNoChecksumContext() {
374     if (verifyChecksum) {
375       if (storageType != null && storageType.isTransient()) {
376         // Checksums are not stored for replicas on transient storage.  We do not
377         // anchor, because we do not intend for client activity to block eviction
378         // from transient storage on the DataNode side.
379         return true;
380       } else {
381         return replica.addNoChecksumAnchor();
382       }
383     } else {
384       return true;
385     }
386   }
387 
releaseNoChecksumContext()388   private void releaseNoChecksumContext() {
389     if (verifyChecksum) {
390       if (storageType == null || !storageType.isTransient()) {
391         replica.removeNoChecksumAnchor();
392       }
393     }
394   }
395 
396   @Override
read(ByteBuffer buf)397   public synchronized int read(ByteBuffer buf) throws IOException {
398     boolean canSkipChecksum = createNoChecksumContext();
399     try {
400       String traceString = null;
401       if (LOG.isTraceEnabled()) {
402         traceString = new StringBuilder().
403             append("read(").
404             append("buf.remaining=").append(buf.remaining()).
405             append(", block=").append(block).
406             append(", filename=").append(filename).
407             append(", canSkipChecksum=").append(canSkipChecksum).
408             append(")").toString();
409         LOG.info(traceString + ": starting");
410       }
411       int nRead;
412       try {
413         if (canSkipChecksum && zeroReadaheadRequested) {
414           nRead = readWithoutBounceBuffer(buf);
415         } else {
416           nRead = readWithBounceBuffer(buf, canSkipChecksum);
417         }
418       } catch (IOException e) {
419         if (LOG.isTraceEnabled()) {
420           LOG.info(traceString + ": I/O error", e);
421         }
422         throw e;
423       }
424       if (LOG.isTraceEnabled()) {
425         LOG.info(traceString + ": returning " + nRead);
426       }
427       return nRead;
428     } finally {
429       if (canSkipChecksum) releaseNoChecksumContext();
430     }
431   }
432 
readWithoutBounceBuffer(ByteBuffer buf)433   private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
434       throws IOException {
435     freeDataBufIfExists();
436     freeChecksumBufIfExists();
437     int total = 0;
438     while (buf.hasRemaining()) {
439       int nRead = dataIn.read(buf, dataPos);
440       if (nRead <= 0) break;
441       dataPos += nRead;
442       total += nRead;
443     }
444     return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
445   }
446 
447   /**
448    * Fill the data buffer.  If necessary, validate the data against the
449    * checksums.
450    *
451    * We always want the offsets of the data contained in dataBuf to be
452    * aligned to the chunk boundary.  If we are validating checksums, we
453    * accomplish this by seeking backwards in the file until we're on a
454    * chunk boundary.  (This is necessary because we can't checksum a
455    * partial chunk.)  If we are not validating checksums, we simply only
456    * fill the latter part of dataBuf.
457    *
458    * @param canSkipChecksum  true if we can skip checksumming.
459    * @return                 true if we hit EOF.
460    * @throws IOException
461    */
fillDataBuf(boolean canSkipChecksum)462   private synchronized boolean fillDataBuf(boolean canSkipChecksum)
463       throws IOException {
464     createDataBufIfNeeded();
465     final int slop = (int)(dataPos % bytesPerChecksum);
466     final long oldDataPos = dataPos;
467     dataBuf.limit(maxReadaheadLength);
468     if (canSkipChecksum) {
469       dataBuf.position(slop);
470       fillBuffer(dataBuf, canSkipChecksum);
471     } else {
472       dataPos -= slop;
473       dataBuf.position(0);
474       fillBuffer(dataBuf, canSkipChecksum);
475     }
476     dataBuf.limit(dataBuf.position());
477     dataBuf.position(Math.min(dataBuf.position(), slop));
478     if (LOG.isTraceEnabled()) {
479       LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
480           "buffer from offset " + oldDataPos + " of " + block);
481     }
482     return dataBuf.limit() != maxReadaheadLength;
483   }
484 
485   /**
486    * Read using the bounce buffer.
487    *
488    * A 'direct' read actually has three phases. The first drains any
489    * remaining bytes from the slow read buffer. After this the read is
490    * guaranteed to be on a checksum chunk boundary. If there are still bytes
491    * to read, the fast direct path is used for as many remaining bytes as
492    * possible, up to a multiple of the checksum chunk size. Finally, any
493    * 'odd' bytes remaining at the end of the read cause another slow read to
494    * be issued, which involves an extra copy.
495    *
496    * Every 'slow' read tries to fill the slow read buffer in one go for
497    * efficiency's sake. As described above, all non-checksum-chunk-aligned
498    * reads will be served from the slower read path.
499    *
500    * @param buf              The buffer to read into.
501    * @param canSkipChecksum  True if we can skip checksums.
502    */
readWithBounceBuffer(ByteBuffer buf, boolean canSkipChecksum)503   private synchronized int readWithBounceBuffer(ByteBuffer buf,
504         boolean canSkipChecksum) throws IOException {
505     int total = 0;
506     int bb = drainDataBuf(buf); // drain bounce buffer if possible
507     if (bb >= 0) {
508       total += bb;
509       if (buf.remaining() == 0) return total;
510     }
511     boolean eof = true, done = false;
512     do {
513       if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
514             && ((dataPos % bytesPerChecksum) == 0)) {
515         // Fast lane: try to read directly into user-supplied buffer, bypassing
516         // bounce buffer.
517         int oldLimit = buf.limit();
518         int nRead;
519         try {
520           buf.limit(buf.position() + maxReadaheadLength);
521           nRead = fillBuffer(buf, canSkipChecksum);
522         } finally {
523           buf.limit(oldLimit);
524         }
525         if (nRead < maxReadaheadLength) {
526           done = true;
527         }
528         if (nRead > 0) {
529           eof = false;
530         }
531         total += nRead;
532       } else {
533         // Slow lane: refill bounce buffer.
534         if (fillDataBuf(canSkipChecksum)) {
535           done = true;
536         }
537         bb = drainDataBuf(buf); // drain bounce buffer if possible
538         if (bb >= 0) {
539           eof = false;
540           total += bb;
541         }
542       }
543     } while ((!done) && (buf.remaining() > 0));
544     return (eof && total == 0) ? -1 : total;
545   }
546 
547   @Override
read(byte[] arr, int off, int len)548   public synchronized int read(byte[] arr, int off, int len)
549         throws IOException {
550     boolean canSkipChecksum = createNoChecksumContext();
551     int nRead;
552     try {
553       String traceString = null;
554       if (LOG.isTraceEnabled()) {
555         traceString = new StringBuilder().
556             append("read(arr.length=").append(arr.length).
557             append(", off=").append(off).
558             append(", len=").append(len).
559             append(", filename=").append(filename).
560             append(", block=").append(block).
561             append(", canSkipChecksum=").append(canSkipChecksum).
562             append(")").toString();
563         LOG.trace(traceString + ": starting");
564       }
565       try {
566         if (canSkipChecksum && zeroReadaheadRequested) {
567           nRead = readWithoutBounceBuffer(arr, off, len);
568         } else {
569           nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
570         }
571       } catch (IOException e) {
572         if (LOG.isTraceEnabled()) {
573           LOG.trace(traceString + ": I/O error", e);
574         }
575         throw e;
576       }
577       if (LOG.isTraceEnabled()) {
578         LOG.trace(traceString + ": returning " + nRead);
579       }
580     } finally {
581       if (canSkipChecksum) releaseNoChecksumContext();
582     }
583     return nRead;
584   }
585 
readWithoutBounceBuffer(byte arr[], int off, int len)586   private synchronized int readWithoutBounceBuffer(byte arr[], int off,
587         int len) throws IOException {
588     freeDataBufIfExists();
589     freeChecksumBufIfExists();
590     int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
591     if (nRead > 0) {
592       dataPos += nRead;
593     } else if ((nRead == 0) && (dataPos == dataIn.size())) {
594       return -1;
595     }
596     return nRead;
597   }
598 
readWithBounceBuffer(byte arr[], int off, int len, boolean canSkipChecksum)599   private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
600         boolean canSkipChecksum) throws IOException {
601     createDataBufIfNeeded();
602     if (!dataBuf.hasRemaining()) {
603       dataBuf.position(0);
604       dataBuf.limit(maxReadaheadLength);
605       fillDataBuf(canSkipChecksum);
606     }
607     if (dataBuf.remaining() == 0) return -1;
608     int toRead = Math.min(dataBuf.remaining(), len);
609     dataBuf.get(arr, off, toRead);
610     return toRead;
611   }
612 
613   @Override
skip(long n)614   public synchronized long skip(long n) throws IOException {
615     int discardedFromBuf = 0;
616     long remaining = n;
617     if ((dataBuf != null) && dataBuf.hasRemaining()) {
618       discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
619       dataBuf.position(dataBuf.position() + discardedFromBuf);
620       remaining -= discardedFromBuf;
621     }
622     if (LOG.isTraceEnabled()) {
623       LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" +
624         filename + "): discarded " + discardedFromBuf + " bytes from " +
625         "dataBuf and advanced dataPos by " + remaining);
626     }
627     dataPos += remaining;
628     return n;
629   }
630 
631   @Override
available()632   public int available() throws IOException {
633     // We never do network I/O in BlockReaderLocal.
634     return Integer.MAX_VALUE;
635   }
636 
637   @Override
close()638   public synchronized void close() throws IOException {
639     if (closed) return;
640     closed = true;
641     if (LOG.isTraceEnabled()) {
642       LOG.trace("close(filename=" + filename + ", block=" + block + ")");
643     }
644     replica.unref();
645     freeDataBufIfExists();
646     freeChecksumBufIfExists();
647   }
648 
649   @Override
readFully(byte[] arr, int off, int len)650   public synchronized void readFully(byte[] arr, int off, int len)
651       throws IOException {
652     BlockReaderUtil.readFully(this, arr, off, len);
653   }
654 
655   @Override
readAll(byte[] buf, int off, int len)656   public synchronized int readAll(byte[] buf, int off, int len)
657       throws IOException {
658     return BlockReaderUtil.readAll(this, buf, off, len);
659   }
660 
661   @Override
isLocal()662   public boolean isLocal() {
663     return true;
664   }
665 
666   @Override
isShortCircuit()667   public boolean isShortCircuit() {
668     return true;
669   }
670 
671   /**
672    * Get or create a memory map for this replica.
673    *
674    * There are two kinds of ClientMmap objects we could fetch here: one that
675    * will always read pre-checksummed data, and one that may read data that
676    * hasn't been checksummed.
677    *
678    * If we fetch the former, "safe" kind of ClientMmap, we have to increment
679    * the anchor count on the shared memory slot.  This will tell the DataNode
680    * not to munlock the block until this ClientMmap is closed.
681    * If we fetch the latter, we don't bother with anchoring.
682    *
683    * @param opts     The options to use, such as SKIP_CHECKSUMS.
684    *
685    * @return         null on failure; the ClientMmap otherwise.
686    */
687   @Override
getClientMmap(EnumSet<ReadOption> opts)688   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
689     boolean anchor = verifyChecksum &&
690         (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
691     if (anchor) {
692       if (!createNoChecksumContext()) {
693         if (LOG.isTraceEnabled()) {
694           LOG.trace("can't get an mmap for " + block + " of " + filename +
695               " since SKIP_CHECKSUMS was not given, " +
696               "we aren't skipping checksums, and the block is not mlocked.");
697         }
698         return null;
699       }
700     }
701     ClientMmap clientMmap = null;
702     try {
703       clientMmap = replica.getOrCreateClientMmap(anchor);
704     } finally {
705       if ((clientMmap == null) && anchor) {
706         releaseNoChecksumContext();
707       }
708     }
709     return clientMmap;
710   }
711 
712   @VisibleForTesting
getVerifyChecksum()713   boolean getVerifyChecksum() {
714     return this.verifyChecksum;
715   }
716 
717   @VisibleForTesting
getMaxReadaheadLength()718   int getMaxReadaheadLength() {
719     return this.maxReadaheadLength;
720   }
721 
722   /**
723    * Make the replica anchorable.  Normally this can only be done by the
724    * DataNode.  This method is only for testing.
725    */
726   @VisibleForTesting
forceAnchorable()727   void forceAnchorable() {
728     replica.getSlot().makeAnchorable();
729   }
730 
731   /**
732    * Make the replica unanchorable.  Normally this can only be done by the
733    * DataNode.  This method is only for testing.
734    */
735   @VisibleForTesting
forceUnanchorable()736   void forceUnanchorable() {
737     replica.getSlot().makeUnanchorable();
738   }
739 }
740