1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; 21 22 import java.io.BufferedOutputStream; 23 import java.io.DataInputStream; 24 import java.io.DataOutputStream; 25 import java.io.FileInputStream; 26 import java.io.IOException; 27 import java.net.InetSocketAddress; 28 29 import org.apache.commons.lang.mutable.MutableBoolean; 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import org.apache.hadoop.classification.InterfaceAudience; 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.fs.StorageType; 35 import org.apache.hadoop.hdfs.net.DomainPeer; 36 import org.apache.hadoop.hdfs.net.Peer; 37 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 38 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 39 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 40 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; 41 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 42 import org.apache.hadoop.hdfs.protocolPB.PBHelper; 43 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 44 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 45 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; 46 import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; 47 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; 48 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; 49 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; 50 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; 51 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; 52 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; 53 import org.apache.hadoop.io.IOUtils; 54 import org.apache.hadoop.ipc.RemoteException; 55 import org.apache.hadoop.net.unix.DomainSocket; 56 import org.apache.hadoop.security.AccessControlException; 57 import org.apache.hadoop.security.UserGroupInformation; 58 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 59 import org.apache.hadoop.security.token.Token; 60 import org.apache.hadoop.util.PerformanceAdvisory; 61 import org.apache.hadoop.util.Time; 62 63 import com.google.common.annotations.VisibleForTesting; 64 import com.google.common.base.Preconditions; 65 66 67 /** 68 * Utility class to create BlockReader implementations. 69 */ 70 @InterfaceAudience.Private 71 public class BlockReaderFactory implements ShortCircuitReplicaCreator { 72 static final Log LOG = LogFactory.getLog(BlockReaderFactory.class); 73 74 public static class FailureInjector { injectRequestFileDescriptorsFailure()75 public void injectRequestFileDescriptorsFailure() throws IOException { 76 // do nothing 77 } getSupportsReceiptVerification()78 public boolean getSupportsReceiptVerification() { 79 return true; 80 } 81 } 82 83 @VisibleForTesting 84 static ShortCircuitReplicaCreator 85 createShortCircuitReplicaInfoCallback = null; 86 87 private final DFSClient.Conf conf; 88 89 /** 90 * Injects failures into specific operations during unit tests. 91 */ 92 private final FailureInjector failureInjector; 93 94 /** 95 * The file name, for logging and debugging purposes. 96 */ 97 private String fileName; 98 99 /** 100 * The block ID and block pool ID to use. 101 */ 102 private ExtendedBlock block; 103 104 /** 105 * The block token to use for security purposes. 106 */ 107 private Token<BlockTokenIdentifier> token; 108 109 /** 110 * The offset within the block to start reading at. 111 */ 112 private long startOffset; 113 114 /** 115 * If false, we won't try to verify the block checksum. 116 */ 117 private boolean verifyChecksum; 118 119 /** 120 * The name of this client. 121 */ 122 private String clientName; 123 124 /** 125 * The DataNode we're talking to. 126 */ 127 private DatanodeInfo datanode; 128 129 /** 130 * StorageType of replica on DataNode. 131 */ 132 private StorageType storageType; 133 134 /** 135 * If false, we won't try short-circuit local reads. 136 */ 137 private boolean allowShortCircuitLocalReads; 138 139 /** 140 * The ClientContext to use for things like the PeerCache. 141 */ 142 private ClientContext clientContext; 143 144 /** 145 * Number of bytes to read. -1 indicates no limit. 146 */ 147 private long length = -1; 148 149 /** 150 * Caching strategy to use when reading the block. 151 */ 152 private CachingStrategy cachingStrategy; 153 154 /** 155 * Socket address to use to connect to peer. 156 */ 157 private InetSocketAddress inetSocketAddress; 158 159 /** 160 * Remote peer factory to use to create a peer, if needed. 161 */ 162 private RemotePeerFactory remotePeerFactory; 163 164 /** 165 * UserGroupInformation to use for legacy block reader local objects, if needed. 166 */ 167 private UserGroupInformation userGroupInformation; 168 169 /** 170 * Configuration to use for legacy block reader local objects, if needed. 171 */ 172 private Configuration configuration; 173 174 /** 175 * Information about the domain socket path we should use to connect to the 176 * local peer-- or null if we haven't examined the local domain socket. 177 */ 178 private DomainSocketFactory.PathInfo pathInfo; 179 180 /** 181 * The remaining number of times that we'll try to pull a socket out of the 182 * cache. 183 */ 184 private int remainingCacheTries; 185 BlockReaderFactory(DFSClient.Conf conf)186 public BlockReaderFactory(DFSClient.Conf conf) { 187 this.conf = conf; 188 this.failureInjector = conf.brfFailureInjector; 189 this.remainingCacheTries = conf.nCachedConnRetry; 190 } 191 setFileName(String fileName)192 public BlockReaderFactory setFileName(String fileName) { 193 this.fileName = fileName; 194 return this; 195 } 196 setBlock(ExtendedBlock block)197 public BlockReaderFactory setBlock(ExtendedBlock block) { 198 this.block = block; 199 return this; 200 } 201 setBlockToken(Token<BlockTokenIdentifier> token)202 public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) { 203 this.token = token; 204 return this; 205 } 206 setStartOffset(long startOffset)207 public BlockReaderFactory setStartOffset(long startOffset) { 208 this.startOffset = startOffset; 209 return this; 210 } 211 setVerifyChecksum(boolean verifyChecksum)212 public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) { 213 this.verifyChecksum = verifyChecksum; 214 return this; 215 } 216 setClientName(String clientName)217 public BlockReaderFactory setClientName(String clientName) { 218 this.clientName = clientName; 219 return this; 220 } 221 setDatanodeInfo(DatanodeInfo datanode)222 public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) { 223 this.datanode = datanode; 224 return this; 225 } 226 setStorageType(StorageType storageType)227 public BlockReaderFactory setStorageType(StorageType storageType) { 228 this.storageType = storageType; 229 return this; 230 } 231 setAllowShortCircuitLocalReads( boolean allowShortCircuitLocalReads)232 public BlockReaderFactory setAllowShortCircuitLocalReads( 233 boolean allowShortCircuitLocalReads) { 234 this.allowShortCircuitLocalReads = allowShortCircuitLocalReads; 235 return this; 236 } 237 setClientCacheContext( ClientContext clientContext)238 public BlockReaderFactory setClientCacheContext( 239 ClientContext clientContext) { 240 this.clientContext = clientContext; 241 return this; 242 } 243 setLength(long length)244 public BlockReaderFactory setLength(long length) { 245 this.length = length; 246 return this; 247 } 248 setCachingStrategy( CachingStrategy cachingStrategy)249 public BlockReaderFactory setCachingStrategy( 250 CachingStrategy cachingStrategy) { 251 this.cachingStrategy = cachingStrategy; 252 return this; 253 } 254 setInetSocketAddress( InetSocketAddress inetSocketAddress)255 public BlockReaderFactory setInetSocketAddress ( 256 InetSocketAddress inetSocketAddress) { 257 this.inetSocketAddress = inetSocketAddress; 258 return this; 259 } 260 setUserGroupInformation( UserGroupInformation userGroupInformation)261 public BlockReaderFactory setUserGroupInformation( 262 UserGroupInformation userGroupInformation) { 263 this.userGroupInformation = userGroupInformation; 264 return this; 265 } 266 setRemotePeerFactory( RemotePeerFactory remotePeerFactory)267 public BlockReaderFactory setRemotePeerFactory( 268 RemotePeerFactory remotePeerFactory) { 269 this.remotePeerFactory = remotePeerFactory; 270 return this; 271 } 272 setConfiguration( Configuration configuration)273 public BlockReaderFactory setConfiguration( 274 Configuration configuration) { 275 this.configuration = configuration; 276 return this; 277 } 278 279 /** 280 * Build a BlockReader with the given options. 281 * 282 * This function will do the best it can to create a block reader that meets 283 * all of our requirements. We prefer short-circuit block readers 284 * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the 285 * former avoid the overhead of socket communication. If short-circuit is 286 * unavailable, our next fallback is data transfer over UNIX domain sockets, 287 * if dfs.client.domain.socket.data.traffic has been enabled. If that doesn't 288 * work, we will try to create a remote block reader that operates over TCP 289 * sockets. 290 * 291 * There are a few caches that are important here. 292 * 293 * The ShortCircuitCache stores file descriptor objects which have been passed 294 * from the DataNode. 295 * 296 * The DomainSocketFactory stores information about UNIX domain socket paths 297 * that we not been able to use in the past, so that we don't waste time 298 * retrying them over and over. (Like all the caches, it does have a timeout, 299 * though.) 300 * 301 * The PeerCache stores peers that we have used in the past. If we can reuse 302 * one of these peers, we avoid the overhead of re-opening a socket. However, 303 * if the socket has been timed out on the remote end, our attempt to reuse 304 * the socket may end with an IOException. For that reason, we limit our 305 * attempts at socket reuse to dfs.client.cached.conn.retry times. After 306 * that, we create new sockets. This avoids the problem where a thread tries 307 * to talk to a peer that it hasn't talked to in a while, and has to clean out 308 * every entry in a socket cache full of stale entries. 309 * 310 * @return The new BlockReader. We will not return null. 311 * 312 * @throws InvalidToken 313 * If the block token was invalid. 314 * InvalidEncryptionKeyException 315 * If the encryption key was invalid. 316 * Other IOException 317 * If there was another problem. 318 */ build()319 public BlockReader build() throws IOException { 320 BlockReader reader = null; 321 322 Preconditions.checkNotNull(configuration); 323 if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) { 324 if (clientContext.getUseLegacyBlockReaderLocal()) { 325 reader = getLegacyBlockReaderLocal(); 326 if (reader != null) { 327 if (LOG.isTraceEnabled()) { 328 LOG.trace(this + ": returning new legacy block reader local."); 329 } 330 return reader; 331 } 332 } else { 333 reader = getBlockReaderLocal(); 334 if (reader != null) { 335 if (LOG.isTraceEnabled()) { 336 LOG.trace(this + ": returning new block reader local."); 337 } 338 return reader; 339 } 340 } 341 } 342 if (conf.domainSocketDataTraffic) { 343 reader = getRemoteBlockReaderFromDomain(); 344 if (reader != null) { 345 if (LOG.isTraceEnabled()) { 346 LOG.trace(this + ": returning new remote block reader using " + 347 "UNIX domain socket on " + pathInfo.getPath()); 348 } 349 return reader; 350 } 351 } 352 Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting, 353 "TCP reads were disabled for testing, but we failed to " + 354 "do a non-TCP read."); 355 return getRemoteBlockReaderFromTcp(); 356 } 357 358 /** 359 * Get {@link BlockReaderLocalLegacy} for short circuited local reads. 360 * This block reader implements the path-based style of local reads 361 * first introduced in HDFS-2246. 362 */ getLegacyBlockReaderLocal()363 private BlockReader getLegacyBlockReaderLocal() throws IOException { 364 if (LOG.isTraceEnabled()) { 365 LOG.trace(this + ": trying to construct BlockReaderLocalLegacy"); 366 } 367 if (!DFSClient.isLocalAddress(inetSocketAddress)) { 368 if (LOG.isTraceEnabled()) { 369 LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " + 370 "the address " + inetSocketAddress + " is not local"); 371 } 372 return null; 373 } 374 if (clientContext.getDisableLegacyBlockReaderLocal()) { 375 PerformanceAdvisory.LOG.debug(this + ": can't construct " + 376 "BlockReaderLocalLegacy because " + 377 "disableLegacyBlockReaderLocal is set."); 378 return null; 379 } 380 IOException ioe = null; 381 try { 382 return BlockReaderLocalLegacy.newBlockReader(conf, 383 userGroupInformation, configuration, fileName, block, token, 384 datanode, startOffset, length, storageType); 385 } catch (RemoteException remoteException) { 386 ioe = remoteException.unwrapRemoteException( 387 InvalidToken.class, AccessControlException.class); 388 } catch (IOException e) { 389 ioe = e; 390 } 391 if ((!(ioe instanceof AccessControlException)) && 392 isSecurityException(ioe)) { 393 // Handle security exceptions. 394 // We do not handle AccessControlException here, since 395 // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate 396 // that the user is not in dfs.block.local-path-access.user, a condition 397 // which requires us to disable legacy SCR. 398 throw ioe; 399 } 400 LOG.warn(this + ": error creating legacy BlockReaderLocal. " + 401 "Disabling legacy local reads.", ioe); 402 clientContext.setDisableLegacyBlockReaderLocal(); 403 return null; 404 } 405 getBlockReaderLocal()406 private BlockReader getBlockReaderLocal() throws InvalidToken { 407 if (LOG.isTraceEnabled()) { 408 LOG.trace(this + ": trying to construct a BlockReaderLocal " + 409 "for short-circuit reads."); 410 } 411 if (pathInfo == null) { 412 pathInfo = clientContext.getDomainSocketFactory(). 413 getPathInfo(inetSocketAddress, conf); 414 } 415 if (!pathInfo.getPathState().getUsableForShortCircuit()) { 416 PerformanceAdvisory.LOG.debug(this + ": " + pathInfo + " is not " + 417 "usable for short circuit; giving up on BlockReaderLocal."); 418 return null; 419 } 420 ShortCircuitCache cache = clientContext.getShortCircuitCache(); 421 ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); 422 ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); 423 InvalidToken exc = info.getInvalidTokenException(); 424 if (exc != null) { 425 if (LOG.isTraceEnabled()) { 426 LOG.trace(this + ": got InvalidToken exception while trying to " + 427 "construct BlockReaderLocal via " + pathInfo.getPath()); 428 } 429 throw exc; 430 } 431 if (info.getReplica() == null) { 432 if (LOG.isTraceEnabled()) { 433 PerformanceAdvisory.LOG.debug(this + ": failed to get " + 434 "ShortCircuitReplica. Cannot construct " + 435 "BlockReaderLocal via " + pathInfo.getPath()); 436 } 437 return null; 438 } 439 return new BlockReaderLocal.Builder(conf). 440 setFilename(fileName). 441 setBlock(block). 442 setStartOffset(startOffset). 443 setShortCircuitReplica(info.getReplica()). 444 setVerifyChecksum(verifyChecksum). 445 setCachingStrategy(cachingStrategy). 446 setStorageType(storageType). 447 build(); 448 } 449 450 /** 451 * Fetch a pair of short-circuit block descriptors from a local DataNode. 452 * 453 * @return Null if we could not communicate with the datanode, 454 * a new ShortCircuitReplicaInfo object otherwise. 455 * ShortCircuitReplicaInfo objects may contain either an InvalidToken 456 * exception, or a ShortCircuitReplica object ready to use. 457 */ 458 @Override createShortCircuitReplicaInfo()459 public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { 460 if (createShortCircuitReplicaInfoCallback != null) { 461 ShortCircuitReplicaInfo info = 462 createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo(); 463 if (info != null) return info; 464 } 465 if (LOG.isTraceEnabled()) { 466 LOG.trace(this + ": trying to create ShortCircuitReplicaInfo."); 467 } 468 BlockReaderPeer curPeer; 469 while (true) { 470 curPeer = nextDomainPeer(); 471 if (curPeer == null) break; 472 if (curPeer.fromCache) remainingCacheTries--; 473 DomainPeer peer = (DomainPeer)curPeer.peer; 474 Slot slot = null; 475 ShortCircuitCache cache = clientContext.getShortCircuitCache(); 476 try { 477 MutableBoolean usedPeer = new MutableBoolean(false); 478 slot = cache.allocShmSlot(datanode, peer, usedPeer, 479 new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()), 480 clientName); 481 if (usedPeer.booleanValue()) { 482 if (LOG.isTraceEnabled()) { 483 LOG.trace(this + ": allocShmSlot used up our previous socket " + 484 peer.getDomainSocket() + ". Allocating a new one..."); 485 } 486 curPeer = nextDomainPeer(); 487 if (curPeer == null) break; 488 peer = (DomainPeer)curPeer.peer; 489 } 490 ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); 491 clientContext.getPeerCache().put(datanode, peer); 492 return info; 493 } catch (IOException e) { 494 if (slot != null) { 495 cache.freeSlot(slot); 496 } 497 if (curPeer.fromCache) { 498 // Handle an I/O error we got when using a cached socket. 499 // These are considered less serious, because the socket may be stale. 500 if (LOG.isDebugEnabled()) { 501 LOG.debug(this + ": closing stale domain peer " + peer, e); 502 } 503 IOUtils.cleanup(LOG, peer); 504 } else { 505 // Handle an I/O error we got when using a newly created socket. 506 // We temporarily disable the domain socket path for a few minutes in 507 // this case, to prevent wasting more time on it. 508 LOG.warn(this + ": I/O error requesting file descriptors. " + 509 "Disabling domain socket " + peer.getDomainSocket(), e); 510 IOUtils.cleanup(LOG, peer); 511 clientContext.getDomainSocketFactory() 512 .disableDomainSocketPath(pathInfo.getPath()); 513 return null; 514 } 515 } 516 } 517 return null; 518 } 519 520 /** 521 * Request file descriptors from a DomainPeer. 522 * 523 * @param peer The peer to use for communication. 524 * @param slot If non-null, the shared memory slot to associate with the 525 * new ShortCircuitReplica. 526 * 527 * @return A ShortCircuitReplica object if we could communicate with the 528 * datanode; null, otherwise. 529 * @throws IOException If we encountered an I/O exception while communicating 530 * with the datanode. 531 */ requestFileDescriptors(DomainPeer peer, Slot slot)532 private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, 533 Slot slot) throws IOException { 534 ShortCircuitCache cache = clientContext.getShortCircuitCache(); 535 final DataOutputStream out = 536 new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); 537 SlotId slotId = slot == null ? null : slot.getSlotId(); 538 new Sender(out).requestShortCircuitFds(block, token, slotId, 1, 539 failureInjector.getSupportsReceiptVerification()); 540 DataInputStream in = new DataInputStream(peer.getInputStream()); 541 BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( 542 PBHelper.vintPrefixed(in)); 543 DomainSocket sock = peer.getDomainSocket(); 544 failureInjector.injectRequestFileDescriptorsFailure(); 545 switch (resp.getStatus()) { 546 case SUCCESS: 547 byte buf[] = new byte[1]; 548 FileInputStream fis[] = new FileInputStream[2]; 549 sock.recvFileInputStreams(fis, buf, 0, buf.length); 550 ShortCircuitReplica replica = null; 551 try { 552 ExtendedBlockId key = 553 new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); 554 if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { 555 LOG.trace("Sending receipt verification byte for slot " + slot); 556 sock.getOutputStream().write(0); 557 } 558 replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, 559 Time.monotonicNow(), slot); 560 return new ShortCircuitReplicaInfo(replica); 561 } catch (IOException e) { 562 // This indicates an error reading from disk, or a format error. Since 563 // it's not a socket communication problem, we return null rather than 564 // throwing an exception. 565 LOG.warn(this + ": error creating ShortCircuitReplica.", e); 566 return null; 567 } finally { 568 if (replica == null) { 569 IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); 570 } 571 } 572 case ERROR_UNSUPPORTED: 573 if (!resp.hasShortCircuitAccessVersion()) { 574 LOG.warn("short-circuit read access is disabled for " + 575 "DataNode " + datanode + ". reason: " + resp.getMessage()); 576 clientContext.getDomainSocketFactory() 577 .disableShortCircuitForPath(pathInfo.getPath()); 578 } else { 579 LOG.warn("short-circuit read access for the file " + 580 fileName + " is disabled for DataNode " + datanode + 581 ". reason: " + resp.getMessage()); 582 } 583 return null; 584 case ERROR_ACCESS_TOKEN: 585 String msg = "access control error while " + 586 "attempting to set up short-circuit access to " + 587 fileName + resp.getMessage(); 588 if (LOG.isDebugEnabled()) { 589 LOG.debug(this + ":" + msg); 590 } 591 return new ShortCircuitReplicaInfo(new InvalidToken(msg)); 592 default: 593 LOG.warn(this + ": unknown response code " + resp.getStatus() + 594 " while attempting to set up short-circuit access. " + 595 resp.getMessage()); 596 clientContext.getDomainSocketFactory() 597 .disableShortCircuitForPath(pathInfo.getPath()); 598 return null; 599 } 600 } 601 602 /** 603 * Get a RemoteBlockReader that communicates over a UNIX domain socket. 604 * 605 * @return The new BlockReader, or null if we failed to create the block 606 * reader. 607 * 608 * @throws InvalidToken If the block token was invalid. 609 * Potentially other security-related execptions. 610 */ getRemoteBlockReaderFromDomain()611 private BlockReader getRemoteBlockReaderFromDomain() throws IOException { 612 if (pathInfo == null) { 613 pathInfo = clientContext.getDomainSocketFactory(). 614 getPathInfo(inetSocketAddress, conf); 615 } 616 if (!pathInfo.getPathState().getUsableForDataTransfer()) { 617 PerformanceAdvisory.LOG.debug(this + ": not trying to create a " + 618 "remote block reader because the UNIX domain socket at " + 619 pathInfo + " is not usable."); 620 return null; 621 } 622 if (LOG.isTraceEnabled()) { 623 LOG.trace(this + ": trying to create a remote block reader from the " + 624 "UNIX domain socket at " + pathInfo.getPath()); 625 } 626 627 while (true) { 628 BlockReaderPeer curPeer = nextDomainPeer(); 629 if (curPeer == null) break; 630 if (curPeer.fromCache) remainingCacheTries--; 631 DomainPeer peer = (DomainPeer)curPeer.peer; 632 BlockReader blockReader = null; 633 try { 634 blockReader = getRemoteBlockReader(peer); 635 return blockReader; 636 } catch (IOException ioe) { 637 IOUtils.cleanup(LOG, peer); 638 if (isSecurityException(ioe)) { 639 if (LOG.isTraceEnabled()) { 640 LOG.trace(this + ": got security exception while constructing " + 641 "a remote block reader from the unix domain socket at " + 642 pathInfo.getPath(), ioe); 643 } 644 throw ioe; 645 } 646 if (curPeer.fromCache) { 647 // Handle an I/O error we got when using a cached peer. These are 648 // considered less serious, because the underlying socket may be stale. 649 if (LOG.isDebugEnabled()) { 650 LOG.debug("Closed potentially stale domain peer " + peer, ioe); 651 } 652 } else { 653 // Handle an I/O error we got when using a newly created domain peer. 654 // We temporarily disable the domain socket path for a few minutes in 655 // this case, to prevent wasting more time on it. 656 LOG.warn("I/O error constructing remote block reader. Disabling " + 657 "domain socket " + peer.getDomainSocket(), ioe); 658 clientContext.getDomainSocketFactory() 659 .disableDomainSocketPath(pathInfo.getPath()); 660 return null; 661 } 662 } finally { 663 if (blockReader == null) { 664 IOUtils.cleanup(LOG, peer); 665 } 666 } 667 } 668 return null; 669 } 670 671 /** 672 * Get a RemoteBlockReader that communicates over a TCP socket. 673 * 674 * @return The new BlockReader. We will not return null, but instead throw 675 * an exception if this fails. 676 * 677 * @throws InvalidToken 678 * If the block token was invalid. 679 * InvalidEncryptionKeyException 680 * If the encryption key was invalid. 681 * Other IOException 682 * If there was another problem. 683 */ getRemoteBlockReaderFromTcp()684 private BlockReader getRemoteBlockReaderFromTcp() throws IOException { 685 if (LOG.isTraceEnabled()) { 686 LOG.trace(this + ": trying to create a remote block reader from a " + 687 "TCP socket"); 688 } 689 BlockReader blockReader = null; 690 while (true) { 691 BlockReaderPeer curPeer = null; 692 Peer peer = null; 693 try { 694 curPeer = nextTcpPeer(); 695 if (curPeer.fromCache) remainingCacheTries--; 696 peer = curPeer.peer; 697 blockReader = getRemoteBlockReader(peer); 698 return blockReader; 699 } catch (IOException ioe) { 700 if (isSecurityException(ioe)) { 701 if (LOG.isTraceEnabled()) { 702 LOG.trace(this + ": got security exception while constructing " + 703 "a remote block reader from " + peer, ioe); 704 } 705 throw ioe; 706 } 707 if ((curPeer != null) && curPeer.fromCache) { 708 // Handle an I/O error we got when using a cached peer. These are 709 // considered less serious, because the underlying socket may be 710 // stale. 711 if (LOG.isDebugEnabled()) { 712 LOG.debug("Closed potentially stale remote peer " + peer, ioe); 713 } 714 } else { 715 // Handle an I/O error we got when using a newly created peer. 716 LOG.warn("I/O error constructing remote block reader.", ioe); 717 throw ioe; 718 } 719 } finally { 720 if (blockReader == null) { 721 IOUtils.cleanup(LOG, peer); 722 } 723 } 724 } 725 } 726 727 public static class BlockReaderPeer { 728 final Peer peer; 729 final boolean fromCache; 730 BlockReaderPeer(Peer peer, boolean fromCache)731 BlockReaderPeer(Peer peer, boolean fromCache) { 732 this.peer = peer; 733 this.fromCache = fromCache; 734 } 735 } 736 737 /** 738 * Get the next DomainPeer-- either from the cache or by creating it. 739 * 740 * @return the next DomainPeer, or null if we could not construct one. 741 */ nextDomainPeer()742 private BlockReaderPeer nextDomainPeer() { 743 if (remainingCacheTries > 0) { 744 Peer peer = clientContext.getPeerCache().get(datanode, true); 745 if (peer != null) { 746 if (LOG.isTraceEnabled()) { 747 LOG.trace("nextDomainPeer: reusing existing peer " + peer); 748 } 749 return new BlockReaderPeer(peer, true); 750 } 751 } 752 DomainSocket sock = clientContext.getDomainSocketFactory(). 753 createSocket(pathInfo, conf.socketTimeout); 754 if (sock == null) return null; 755 return new BlockReaderPeer(new DomainPeer(sock), false); 756 } 757 758 /** 759 * Get the next TCP-based peer-- either from the cache or by creating it. 760 * 761 * @return the next Peer, or null if we could not construct one. 762 * 763 * @throws IOException If there was an error while constructing the peer 764 * (such as an InvalidEncryptionKeyException) 765 */ nextTcpPeer()766 private BlockReaderPeer nextTcpPeer() throws IOException { 767 if (remainingCacheTries > 0) { 768 Peer peer = clientContext.getPeerCache().get(datanode, false); 769 if (peer != null) { 770 if (LOG.isTraceEnabled()) { 771 LOG.trace("nextTcpPeer: reusing existing peer " + peer); 772 } 773 return new BlockReaderPeer(peer, true); 774 } 775 } 776 try { 777 Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token, 778 datanode); 779 if (LOG.isTraceEnabled()) { 780 LOG.trace("nextTcpPeer: created newConnectedPeer " + peer); 781 } 782 return new BlockReaderPeer(peer, false); 783 } catch (IOException e) { 784 if (LOG.isTraceEnabled()) { 785 LOG.trace("nextTcpPeer: failed to create newConnectedPeer " + 786 "connected to " + datanode); 787 } 788 throw e; 789 } 790 } 791 792 /** 793 * Determine if an exception is security-related. 794 * 795 * We need to handle these exceptions differently than other IOExceptions. 796 * They don't indicate a communication problem. Instead, they mean that there 797 * is some action the client needs to take, such as refetching block tokens, 798 * renewing encryption keys, etc. 799 * 800 * @param ioe The exception 801 * @return True only if the exception is security-related. 802 */ isSecurityException(IOException ioe)803 private static boolean isSecurityException(IOException ioe) { 804 return (ioe instanceof InvalidToken) || 805 (ioe instanceof InvalidEncryptionKeyException) || 806 (ioe instanceof InvalidBlockTokenException) || 807 (ioe instanceof AccessControlException); 808 } 809 810 @SuppressWarnings("deprecation") getRemoteBlockReader(Peer peer)811 private BlockReader getRemoteBlockReader(Peer peer) throws IOException { 812 if (conf.useLegacyBlockReader) { 813 return RemoteBlockReader.newBlockReader(fileName, 814 block, token, startOffset, length, conf.ioBufferSize, 815 verifyChecksum, clientName, peer, datanode, 816 clientContext.getPeerCache(), cachingStrategy); 817 } else { 818 return RemoteBlockReader2.newBlockReader( 819 fileName, block, token, startOffset, length, 820 verifyChecksum, clientName, peer, datanode, 821 clientContext.getPeerCache(), cachingStrategy); 822 } 823 } 824 825 @Override toString()826 public String toString() { 827 return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")"; 828 } 829 830 /** 831 * File name to print when accessing a block directly (from servlets) 832 * @param s Address of the block location 833 * @param poolId Block pool ID of the block 834 * @param blockId Block ID of the block 835 * @return string that has a file name for debug purposes 836 */ getFileName(final InetSocketAddress s, final String poolId, final long blockId)837 public static String getFileName(final InetSocketAddress s, 838 final String poolId, final long blockId) { 839 return s.toString() + ":" + poolId + ":" + blockId; 840 } 841 } 842