1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import java.io.IOException; 21 import java.nio.ByteBuffer; 22 import java.nio.channels.FileChannel; 23 import java.util.EnumSet; 24 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 import org.apache.hadoop.fs.ReadOption; 28 import org.apache.hadoop.fs.StorageType; 29 import org.apache.hadoop.hdfs.DFSClient.Conf; 30 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 31 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; 32 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; 33 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; 34 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; 35 import org.apache.hadoop.util.DirectBufferPool; 36 import org.apache.hadoop.util.DataChecksum; 37 import org.apache.htrace.Sampler; 38 import org.apache.htrace.Trace; 39 import org.apache.htrace.TraceScope; 40 41 import com.google.common.annotations.VisibleForTesting; 42 import com.google.common.base.Preconditions; 43 44 /** 45 * BlockReaderLocal enables local short circuited reads. If the DFS client is on 46 * the same machine as the datanode, then the client can read files directly 47 * from the local file system rather than going through the datanode for better 48 * performance. <br> 49 * {@link BlockReaderLocal} works as follows: 50 * <ul> 51 * <li>The client performing short circuit reads must be configured at the 52 * datanode.</li> 53 * <li>The client gets the file descriptors for the metadata file and the data 54 * file for the block using 55 * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}. 56 * </li> 57 * <li>The client reads the file descriptors.</li> 58 * </ul> 59 */ 60 class BlockReaderLocal implements BlockReader { 61 static final Log LOG = LogFactory.getLog(BlockReaderLocal.class); 62 63 private static final DirectBufferPool bufferPool = new DirectBufferPool(); 64 65 public static class Builder { 66 private final int bufferSize; 67 private boolean verifyChecksum; 68 private int maxReadahead; 69 private String filename; 70 private ShortCircuitReplica replica; 71 private long dataPos; 72 private ExtendedBlock block; 73 private StorageType storageType; 74 Builder(Conf conf)75 public Builder(Conf conf) { 76 this.maxReadahead = Integer.MAX_VALUE; 77 this.verifyChecksum = !conf.skipShortCircuitChecksums; 78 this.bufferSize = conf.shortCircuitBufferSize; 79 } 80 setVerifyChecksum(boolean verifyChecksum)81 public Builder setVerifyChecksum(boolean verifyChecksum) { 82 this.verifyChecksum = verifyChecksum; 83 return this; 84 } 85 setCachingStrategy(CachingStrategy cachingStrategy)86 public Builder setCachingStrategy(CachingStrategy cachingStrategy) { 87 long readahead = cachingStrategy.getReadahead() != null ? 88 cachingStrategy.getReadahead() : 89 DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; 90 this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); 91 return this; 92 } 93 setFilename(String filename)94 public Builder setFilename(String filename) { 95 this.filename = filename; 96 return this; 97 } 98 setShortCircuitReplica(ShortCircuitReplica replica)99 public Builder setShortCircuitReplica(ShortCircuitReplica replica) { 100 this.replica = replica; 101 return this; 102 } 103 setStartOffset(long startOffset)104 public Builder setStartOffset(long startOffset) { 105 this.dataPos = Math.max(0, startOffset); 106 return this; 107 } 108 setBlock(ExtendedBlock block)109 public Builder setBlock(ExtendedBlock block) { 110 this.block = block; 111 return this; 112 } 113 setStorageType(StorageType storageType)114 public Builder setStorageType(StorageType storageType) { 115 this.storageType = storageType; 116 return this; 117 } 118 build()119 public BlockReaderLocal build() { 120 Preconditions.checkNotNull(replica); 121 return new BlockReaderLocal(this); 122 } 123 } 124 125 private boolean closed = false; 126 127 /** 128 * Pair of streams for this block. 129 */ 130 private final ShortCircuitReplica replica; 131 132 /** 133 * The data FileChannel. 134 */ 135 private final FileChannel dataIn; 136 137 /** 138 * The next place we'll read from in the block data FileChannel. 139 * 140 * If data is buffered in dataBuf, this offset will be larger than the 141 * offset of the next byte which a read() operation will give us. 142 */ 143 private long dataPos; 144 145 /** 146 * The Checksum FileChannel. 147 */ 148 private final FileChannel checksumIn; 149 150 /** 151 * Checksum type and size. 152 */ 153 private final DataChecksum checksum; 154 155 /** 156 * If false, we will always skip the checksum. 157 */ 158 private final boolean verifyChecksum; 159 160 /** 161 * Name of the block, for logging purposes. 162 */ 163 private final String filename; 164 165 /** 166 * Block ID and Block Pool ID. 167 */ 168 private final ExtendedBlock block; 169 170 /** 171 * Cache of Checksum#bytesPerChecksum. 172 */ 173 private final int bytesPerChecksum; 174 175 /** 176 * Cache of Checksum#checksumSize. 177 */ 178 private final int checksumSize; 179 180 /** 181 * Maximum number of chunks to allocate. 182 * 183 * This is used to allocate dataBuf and checksumBuf, in the event that 184 * we need them. 185 */ 186 private final int maxAllocatedChunks; 187 188 /** 189 * True if zero readahead was requested. 190 */ 191 private final boolean zeroReadaheadRequested; 192 193 /** 194 * Maximum amount of readahead we'll do. This will always be at least the, 195 * size of a single chunk, even if {@link #zeroReadaheadRequested} is true. 196 * The reason is because we need to do a certain amount of buffering in order 197 * to do checksumming. 198 * 199 * This determines how many bytes we'll use out of dataBuf and checksumBuf. 200 * Why do we allocate buffers, and then (potentially) only use part of them? 201 * The rationale is that allocating a lot of buffers of different sizes would 202 * make it very difficult for the DirectBufferPool to re-use buffers. 203 */ 204 private final int maxReadaheadLength; 205 206 /** 207 * Buffers data starting at the current dataPos and extending on 208 * for dataBuf.limit(). 209 * 210 * This may be null if we don't need it. 211 */ 212 private ByteBuffer dataBuf; 213 214 /** 215 * Buffers checksums starting at the current checksumPos and extending on 216 * for checksumBuf.limit(). 217 * 218 * This may be null if we don't need it. 219 */ 220 private ByteBuffer checksumBuf; 221 222 /** 223 * StorageType of replica on DataNode. 224 */ 225 private StorageType storageType; 226 BlockReaderLocal(Builder builder)227 private BlockReaderLocal(Builder builder) { 228 this.replica = builder.replica; 229 this.dataIn = replica.getDataStream().getChannel(); 230 this.dataPos = builder.dataPos; 231 this.checksumIn = replica.getMetaStream().getChannel(); 232 BlockMetadataHeader header = builder.replica.getMetaHeader(); 233 this.checksum = header.getChecksum(); 234 this.verifyChecksum = builder.verifyChecksum && 235 (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); 236 this.filename = builder.filename; 237 this.block = builder.block; 238 this.bytesPerChecksum = checksum.getBytesPerChecksum(); 239 this.checksumSize = checksum.getChecksumSize(); 240 241 this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : 242 ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); 243 // Calculate the effective maximum readahead. 244 // We can't do more readahead than there is space in the buffer. 245 int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : 246 ((Math.min(builder.bufferSize, builder.maxReadahead) + 247 bytesPerChecksum - 1) / bytesPerChecksum); 248 if (maxReadaheadChunks == 0) { 249 this.zeroReadaheadRequested = true; 250 maxReadaheadChunks = 1; 251 } else { 252 this.zeroReadaheadRequested = false; 253 } 254 this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; 255 this.storageType = builder.storageType; 256 } 257 createDataBufIfNeeded()258 private synchronized void createDataBufIfNeeded() { 259 if (dataBuf == null) { 260 dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum); 261 dataBuf.position(0); 262 dataBuf.limit(0); 263 } 264 } 265 freeDataBufIfExists()266 private synchronized void freeDataBufIfExists() { 267 if (dataBuf != null) { 268 // When disposing of a dataBuf, we have to move our stored file index 269 // backwards. 270 dataPos -= dataBuf.remaining(); 271 dataBuf.clear(); 272 bufferPool.returnBuffer(dataBuf); 273 dataBuf = null; 274 } 275 } 276 createChecksumBufIfNeeded()277 private synchronized void createChecksumBufIfNeeded() { 278 if (checksumBuf == null) { 279 checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize); 280 checksumBuf.position(0); 281 checksumBuf.limit(0); 282 } 283 } 284 freeChecksumBufIfExists()285 private synchronized void freeChecksumBufIfExists() { 286 if (checksumBuf != null) { 287 checksumBuf.clear(); 288 bufferPool.returnBuffer(checksumBuf); 289 checksumBuf = null; 290 } 291 } 292 drainDataBuf(ByteBuffer buf)293 private synchronized int drainDataBuf(ByteBuffer buf) { 294 if (dataBuf == null) return -1; 295 int oldLimit = dataBuf.limit(); 296 int nRead = Math.min(dataBuf.remaining(), buf.remaining()); 297 if (nRead == 0) { 298 return (dataBuf.remaining() == 0) ? -1 : 0; 299 } 300 try { 301 dataBuf.limit(dataBuf.position() + nRead); 302 buf.put(dataBuf); 303 } finally { 304 dataBuf.limit(oldLimit); 305 } 306 return nRead; 307 } 308 309 /** 310 * Read from the block file into a buffer. 311 * 312 * This function overwrites checksumBuf. It will increment dataPos. 313 * 314 * @param buf The buffer to read into. May be dataBuf. 315 * The position and limit of this buffer should be set to 316 * multiples of the checksum size. 317 * @param canSkipChecksum True if we can skip checksumming. 318 * 319 * @return Total bytes read. 0 on EOF. 320 */ fillBuffer(ByteBuffer buf, boolean canSkipChecksum)321 private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) 322 throws IOException { 323 TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" + 324 block.getBlockId() + ")", Sampler.NEVER); 325 try { 326 int total = 0; 327 long startDataPos = dataPos; 328 int startBufPos = buf.position(); 329 while (buf.hasRemaining()) { 330 int nRead = dataIn.read(buf, dataPos); 331 if (nRead < 0) { 332 break; 333 } 334 dataPos += nRead; 335 total += nRead; 336 } 337 if (canSkipChecksum) { 338 freeChecksumBufIfExists(); 339 return total; 340 } 341 if (total > 0) { 342 try { 343 buf.limit(buf.position()); 344 buf.position(startBufPos); 345 createChecksumBufIfNeeded(); 346 int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum; 347 checksumBuf.clear(); 348 checksumBuf.limit(checksumsNeeded * checksumSize); 349 long checksumPos = BlockMetadataHeader.getHeaderSize() 350 + ((startDataPos / bytesPerChecksum) * checksumSize); 351 while (checksumBuf.hasRemaining()) { 352 int nRead = checksumIn.read(checksumBuf, checksumPos); 353 if (nRead < 0) { 354 throw new IOException("Got unexpected checksum file EOF at " + 355 checksumPos + ", block file position " + startDataPos + " for " + 356 "block " + block + " of file " + filename); 357 } 358 checksumPos += nRead; 359 } 360 checksumBuf.flip(); 361 362 checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); 363 } finally { 364 buf.position(buf.limit()); 365 } 366 } 367 return total; 368 } finally { 369 scope.close(); 370 } 371 } 372 createNoChecksumContext()373 private boolean createNoChecksumContext() { 374 if (verifyChecksum) { 375 if (storageType != null && storageType.isTransient()) { 376 // Checksums are not stored for replicas on transient storage. We do not 377 // anchor, because we do not intend for client activity to block eviction 378 // from transient storage on the DataNode side. 379 return true; 380 } else { 381 return replica.addNoChecksumAnchor(); 382 } 383 } else { 384 return true; 385 } 386 } 387 releaseNoChecksumContext()388 private void releaseNoChecksumContext() { 389 if (verifyChecksum) { 390 if (storageType == null || !storageType.isTransient()) { 391 replica.removeNoChecksumAnchor(); 392 } 393 } 394 } 395 396 @Override read(ByteBuffer buf)397 public synchronized int read(ByteBuffer buf) throws IOException { 398 boolean canSkipChecksum = createNoChecksumContext(); 399 try { 400 String traceString = null; 401 if (LOG.isTraceEnabled()) { 402 traceString = new StringBuilder(). 403 append("read("). 404 append("buf.remaining=").append(buf.remaining()). 405 append(", block=").append(block). 406 append(", filename=").append(filename). 407 append(", canSkipChecksum=").append(canSkipChecksum). 408 append(")").toString(); 409 LOG.info(traceString + ": starting"); 410 } 411 int nRead; 412 try { 413 if (canSkipChecksum && zeroReadaheadRequested) { 414 nRead = readWithoutBounceBuffer(buf); 415 } else { 416 nRead = readWithBounceBuffer(buf, canSkipChecksum); 417 } 418 } catch (IOException e) { 419 if (LOG.isTraceEnabled()) { 420 LOG.info(traceString + ": I/O error", e); 421 } 422 throw e; 423 } 424 if (LOG.isTraceEnabled()) { 425 LOG.info(traceString + ": returning " + nRead); 426 } 427 return nRead; 428 } finally { 429 if (canSkipChecksum) releaseNoChecksumContext(); 430 } 431 } 432 readWithoutBounceBuffer(ByteBuffer buf)433 private synchronized int readWithoutBounceBuffer(ByteBuffer buf) 434 throws IOException { 435 freeDataBufIfExists(); 436 freeChecksumBufIfExists(); 437 int total = 0; 438 while (buf.hasRemaining()) { 439 int nRead = dataIn.read(buf, dataPos); 440 if (nRead <= 0) break; 441 dataPos += nRead; 442 total += nRead; 443 } 444 return (total == 0 && (dataPos == dataIn.size())) ? -1 : total; 445 } 446 447 /** 448 * Fill the data buffer. If necessary, validate the data against the 449 * checksums. 450 * 451 * We always want the offsets of the data contained in dataBuf to be 452 * aligned to the chunk boundary. If we are validating checksums, we 453 * accomplish this by seeking backwards in the file until we're on a 454 * chunk boundary. (This is necessary because we can't checksum a 455 * partial chunk.) If we are not validating checksums, we simply only 456 * fill the latter part of dataBuf. 457 * 458 * @param canSkipChecksum true if we can skip checksumming. 459 * @return true if we hit EOF. 460 * @throws IOException 461 */ fillDataBuf(boolean canSkipChecksum)462 private synchronized boolean fillDataBuf(boolean canSkipChecksum) 463 throws IOException { 464 createDataBufIfNeeded(); 465 final int slop = (int)(dataPos % bytesPerChecksum); 466 final long oldDataPos = dataPos; 467 dataBuf.limit(maxReadaheadLength); 468 if (canSkipChecksum) { 469 dataBuf.position(slop); 470 fillBuffer(dataBuf, canSkipChecksum); 471 } else { 472 dataPos -= slop; 473 dataBuf.position(0); 474 fillBuffer(dataBuf, canSkipChecksum); 475 } 476 dataBuf.limit(dataBuf.position()); 477 dataBuf.position(Math.min(dataBuf.position(), slop)); 478 if (LOG.isTraceEnabled()) { 479 LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " + 480 "buffer from offset " + oldDataPos + " of " + block); 481 } 482 return dataBuf.limit() != maxReadaheadLength; 483 } 484 485 /** 486 * Read using the bounce buffer. 487 * 488 * A 'direct' read actually has three phases. The first drains any 489 * remaining bytes from the slow read buffer. After this the read is 490 * guaranteed to be on a checksum chunk boundary. If there are still bytes 491 * to read, the fast direct path is used for as many remaining bytes as 492 * possible, up to a multiple of the checksum chunk size. Finally, any 493 * 'odd' bytes remaining at the end of the read cause another slow read to 494 * be issued, which involves an extra copy. 495 * 496 * Every 'slow' read tries to fill the slow read buffer in one go for 497 * efficiency's sake. As described above, all non-checksum-chunk-aligned 498 * reads will be served from the slower read path. 499 * 500 * @param buf The buffer to read into. 501 * @param canSkipChecksum True if we can skip checksums. 502 */ readWithBounceBuffer(ByteBuffer buf, boolean canSkipChecksum)503 private synchronized int readWithBounceBuffer(ByteBuffer buf, 504 boolean canSkipChecksum) throws IOException { 505 int total = 0; 506 int bb = drainDataBuf(buf); // drain bounce buffer if possible 507 if (bb >= 0) { 508 total += bb; 509 if (buf.remaining() == 0) return total; 510 } 511 boolean eof = true, done = false; 512 do { 513 if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength) 514 && ((dataPos % bytesPerChecksum) == 0)) { 515 // Fast lane: try to read directly into user-supplied buffer, bypassing 516 // bounce buffer. 517 int oldLimit = buf.limit(); 518 int nRead; 519 try { 520 buf.limit(buf.position() + maxReadaheadLength); 521 nRead = fillBuffer(buf, canSkipChecksum); 522 } finally { 523 buf.limit(oldLimit); 524 } 525 if (nRead < maxReadaheadLength) { 526 done = true; 527 } 528 if (nRead > 0) { 529 eof = false; 530 } 531 total += nRead; 532 } else { 533 // Slow lane: refill bounce buffer. 534 if (fillDataBuf(canSkipChecksum)) { 535 done = true; 536 } 537 bb = drainDataBuf(buf); // drain bounce buffer if possible 538 if (bb >= 0) { 539 eof = false; 540 total += bb; 541 } 542 } 543 } while ((!done) && (buf.remaining() > 0)); 544 return (eof && total == 0) ? -1 : total; 545 } 546 547 @Override read(byte[] arr, int off, int len)548 public synchronized int read(byte[] arr, int off, int len) 549 throws IOException { 550 boolean canSkipChecksum = createNoChecksumContext(); 551 int nRead; 552 try { 553 String traceString = null; 554 if (LOG.isTraceEnabled()) { 555 traceString = new StringBuilder(). 556 append("read(arr.length=").append(arr.length). 557 append(", off=").append(off). 558 append(", len=").append(len). 559 append(", filename=").append(filename). 560 append(", block=").append(block). 561 append(", canSkipChecksum=").append(canSkipChecksum). 562 append(")").toString(); 563 LOG.trace(traceString + ": starting"); 564 } 565 try { 566 if (canSkipChecksum && zeroReadaheadRequested) { 567 nRead = readWithoutBounceBuffer(arr, off, len); 568 } else { 569 nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); 570 } 571 } catch (IOException e) { 572 if (LOG.isTraceEnabled()) { 573 LOG.trace(traceString + ": I/O error", e); 574 } 575 throw e; 576 } 577 if (LOG.isTraceEnabled()) { 578 LOG.trace(traceString + ": returning " + nRead); 579 } 580 } finally { 581 if (canSkipChecksum) releaseNoChecksumContext(); 582 } 583 return nRead; 584 } 585 readWithoutBounceBuffer(byte arr[], int off, int len)586 private synchronized int readWithoutBounceBuffer(byte arr[], int off, 587 int len) throws IOException { 588 freeDataBufIfExists(); 589 freeChecksumBufIfExists(); 590 int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos); 591 if (nRead > 0) { 592 dataPos += nRead; 593 } else if ((nRead == 0) && (dataPos == dataIn.size())) { 594 return -1; 595 } 596 return nRead; 597 } 598 readWithBounceBuffer(byte arr[], int off, int len, boolean canSkipChecksum)599 private synchronized int readWithBounceBuffer(byte arr[], int off, int len, 600 boolean canSkipChecksum) throws IOException { 601 createDataBufIfNeeded(); 602 if (!dataBuf.hasRemaining()) { 603 dataBuf.position(0); 604 dataBuf.limit(maxReadaheadLength); 605 fillDataBuf(canSkipChecksum); 606 } 607 if (dataBuf.remaining() == 0) return -1; 608 int toRead = Math.min(dataBuf.remaining(), len); 609 dataBuf.get(arr, off, toRead); 610 return toRead; 611 } 612 613 @Override skip(long n)614 public synchronized long skip(long n) throws IOException { 615 int discardedFromBuf = 0; 616 long remaining = n; 617 if ((dataBuf != null) && dataBuf.hasRemaining()) { 618 discardedFromBuf = (int)Math.min(dataBuf.remaining(), n); 619 dataBuf.position(dataBuf.position() + discardedFromBuf); 620 remaining -= discardedFromBuf; 621 } 622 if (LOG.isTraceEnabled()) { 623 LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + 624 filename + "): discarded " + discardedFromBuf + " bytes from " + 625 "dataBuf and advanced dataPos by " + remaining); 626 } 627 dataPos += remaining; 628 return n; 629 } 630 631 @Override available()632 public int available() throws IOException { 633 // We never do network I/O in BlockReaderLocal. 634 return Integer.MAX_VALUE; 635 } 636 637 @Override close()638 public synchronized void close() throws IOException { 639 if (closed) return; 640 closed = true; 641 if (LOG.isTraceEnabled()) { 642 LOG.trace("close(filename=" + filename + ", block=" + block + ")"); 643 } 644 replica.unref(); 645 freeDataBufIfExists(); 646 freeChecksumBufIfExists(); 647 } 648 649 @Override readFully(byte[] arr, int off, int len)650 public synchronized void readFully(byte[] arr, int off, int len) 651 throws IOException { 652 BlockReaderUtil.readFully(this, arr, off, len); 653 } 654 655 @Override readAll(byte[] buf, int off, int len)656 public synchronized int readAll(byte[] buf, int off, int len) 657 throws IOException { 658 return BlockReaderUtil.readAll(this, buf, off, len); 659 } 660 661 @Override isLocal()662 public boolean isLocal() { 663 return true; 664 } 665 666 @Override isShortCircuit()667 public boolean isShortCircuit() { 668 return true; 669 } 670 671 /** 672 * Get or create a memory map for this replica. 673 * 674 * There are two kinds of ClientMmap objects we could fetch here: one that 675 * will always read pre-checksummed data, and one that may read data that 676 * hasn't been checksummed. 677 * 678 * If we fetch the former, "safe" kind of ClientMmap, we have to increment 679 * the anchor count on the shared memory slot. This will tell the DataNode 680 * not to munlock the block until this ClientMmap is closed. 681 * If we fetch the latter, we don't bother with anchoring. 682 * 683 * @param opts The options to use, such as SKIP_CHECKSUMS. 684 * 685 * @return null on failure; the ClientMmap otherwise. 686 */ 687 @Override getClientMmap(EnumSet<ReadOption> opts)688 public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { 689 boolean anchor = verifyChecksum && 690 (opts.contains(ReadOption.SKIP_CHECKSUMS) == false); 691 if (anchor) { 692 if (!createNoChecksumContext()) { 693 if (LOG.isTraceEnabled()) { 694 LOG.trace("can't get an mmap for " + block + " of " + filename + 695 " since SKIP_CHECKSUMS was not given, " + 696 "we aren't skipping checksums, and the block is not mlocked."); 697 } 698 return null; 699 } 700 } 701 ClientMmap clientMmap = null; 702 try { 703 clientMmap = replica.getOrCreateClientMmap(anchor); 704 } finally { 705 if ((clientMmap == null) && anchor) { 706 releaseNoChecksumContext(); 707 } 708 } 709 return clientMmap; 710 } 711 712 @VisibleForTesting getVerifyChecksum()713 boolean getVerifyChecksum() { 714 return this.verifyChecksum; 715 } 716 717 @VisibleForTesting getMaxReadaheadLength()718 int getMaxReadaheadLength() { 719 return this.maxReadaheadLength; 720 } 721 722 /** 723 * Make the replica anchorable. Normally this can only be done by the 724 * DataNode. This method is only for testing. 725 */ 726 @VisibleForTesting forceAnchorable()727 void forceAnchorable() { 728 replica.getSlot().makeAnchorable(); 729 } 730 731 /** 732 * Make the replica unanchorable. Normally this can only be done by the 733 * DataNode. This method is only for testing. 734 */ 735 @VisibleForTesting forceUnanchorable()736 void forceUnanchorable() { 737 replica.getSlot().makeUnanchorable(); 738 } 739 } 740