1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
21 
22 import java.io.BufferedOutputStream;
23 import java.io.DataInputStream;
24 import java.io.DataOutputStream;
25 import java.io.FileInputStream;
26 import java.io.IOException;
27 import java.net.InetSocketAddress;
28 
29 import org.apache.commons.lang.mutable.MutableBoolean;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.StorageType;
35 import org.apache.hadoop.hdfs.net.DomainPeer;
36 import org.apache.hadoop.hdfs.net.Peer;
37 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
38 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
39 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
40 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
41 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
42 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
43 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
44 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
45 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
46 import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
47 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
48 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
49 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
50 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
51 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
52 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
53 import org.apache.hadoop.io.IOUtils;
54 import org.apache.hadoop.ipc.RemoteException;
55 import org.apache.hadoop.net.unix.DomainSocket;
56 import org.apache.hadoop.security.AccessControlException;
57 import org.apache.hadoop.security.UserGroupInformation;
58 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
59 import org.apache.hadoop.security.token.Token;
60 import org.apache.hadoop.util.PerformanceAdvisory;
61 import org.apache.hadoop.util.Time;
62 
63 import com.google.common.annotations.VisibleForTesting;
64 import com.google.common.base.Preconditions;
65 
66 
67 /**
68  * Utility class to create BlockReader implementations.
69  */
70 @InterfaceAudience.Private
71 public class BlockReaderFactory implements ShortCircuitReplicaCreator {
72   static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
73 
74   public static class FailureInjector {
injectRequestFileDescriptorsFailure()75     public void injectRequestFileDescriptorsFailure() throws IOException {
76       // do nothing
77     }
getSupportsReceiptVerification()78     public boolean getSupportsReceiptVerification() {
79       return true;
80     }
81   }
82 
83   @VisibleForTesting
84   static ShortCircuitReplicaCreator
85       createShortCircuitReplicaInfoCallback = null;
86 
87   private final DFSClient.Conf conf;
88 
89   /**
90    * Injects failures into specific operations during unit tests.
91    */
92   private final FailureInjector failureInjector;
93 
94   /**
95    * The file name, for logging and debugging purposes.
96    */
97   private String fileName;
98 
99   /**
100    * The block ID and block pool ID to use.
101    */
102   private ExtendedBlock block;
103 
104   /**
105    * The block token to use for security purposes.
106    */
107   private Token<BlockTokenIdentifier> token;
108 
109   /**
110    * The offset within the block to start reading at.
111    */
112   private long startOffset;
113 
114   /**
115    * If false, we won't try to verify the block checksum.
116    */
117   private boolean verifyChecksum;
118 
119   /**
120    * The name of this client.
121    */
122   private String clientName;
123 
124   /**
125    * The DataNode we're talking to.
126    */
127   private DatanodeInfo datanode;
128 
129   /**
130    * StorageType of replica on DataNode.
131    */
132   private StorageType storageType;
133 
134   /**
135    * If false, we won't try short-circuit local reads.
136    */
137   private boolean allowShortCircuitLocalReads;
138 
139   /**
140    * The ClientContext to use for things like the PeerCache.
141    */
142   private ClientContext clientContext;
143 
144   /**
145    * Number of bytes to read.  -1 indicates no limit.
146    */
147   private long length = -1;
148 
149   /**
150    * Caching strategy to use when reading the block.
151    */
152   private CachingStrategy cachingStrategy;
153 
154   /**
155    * Socket address to use to connect to peer.
156    */
157   private InetSocketAddress inetSocketAddress;
158 
159   /**
160    * Remote peer factory to use to create a peer, if needed.
161    */
162   private RemotePeerFactory remotePeerFactory;
163 
164   /**
165    * UserGroupInformation  to use for legacy block reader local objects, if needed.
166    */
167   private UserGroupInformation userGroupInformation;
168 
169   /**
170    * Configuration to use for legacy block reader local objects, if needed.
171    */
172   private Configuration configuration;
173 
174   /**
175    * Information about the domain socket path we should use to connect to the
176    * local peer-- or null if we haven't examined the local domain socket.
177    */
178   private DomainSocketFactory.PathInfo pathInfo;
179 
180   /**
181    * The remaining number of times that we'll try to pull a socket out of the
182    * cache.
183    */
184   private int remainingCacheTries;
185 
BlockReaderFactory(DFSClient.Conf conf)186   public BlockReaderFactory(DFSClient.Conf conf) {
187     this.conf = conf;
188     this.failureInjector = conf.brfFailureInjector;
189     this.remainingCacheTries = conf.nCachedConnRetry;
190   }
191 
setFileName(String fileName)192   public BlockReaderFactory setFileName(String fileName) {
193     this.fileName = fileName;
194     return this;
195   }
196 
setBlock(ExtendedBlock block)197   public BlockReaderFactory setBlock(ExtendedBlock block) {
198     this.block = block;
199     return this;
200   }
201 
setBlockToken(Token<BlockTokenIdentifier> token)202   public BlockReaderFactory setBlockToken(Token<BlockTokenIdentifier> token) {
203     this.token = token;
204     return this;
205   }
206 
setStartOffset(long startOffset)207   public BlockReaderFactory setStartOffset(long startOffset) {
208     this.startOffset = startOffset;
209     return this;
210   }
211 
setVerifyChecksum(boolean verifyChecksum)212   public BlockReaderFactory setVerifyChecksum(boolean verifyChecksum) {
213     this.verifyChecksum = verifyChecksum;
214     return this;
215   }
216 
setClientName(String clientName)217   public BlockReaderFactory setClientName(String clientName) {
218     this.clientName = clientName;
219     return this;
220   }
221 
setDatanodeInfo(DatanodeInfo datanode)222   public BlockReaderFactory setDatanodeInfo(DatanodeInfo datanode) {
223     this.datanode = datanode;
224     return this;
225   }
226 
setStorageType(StorageType storageType)227   public BlockReaderFactory setStorageType(StorageType storageType) {
228     this.storageType = storageType;
229     return this;
230   }
231 
setAllowShortCircuitLocalReads( boolean allowShortCircuitLocalReads)232   public BlockReaderFactory setAllowShortCircuitLocalReads(
233       boolean allowShortCircuitLocalReads) {
234     this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
235     return this;
236   }
237 
setClientCacheContext( ClientContext clientContext)238   public BlockReaderFactory setClientCacheContext(
239       ClientContext clientContext) {
240     this.clientContext = clientContext;
241     return this;
242   }
243 
setLength(long length)244   public BlockReaderFactory setLength(long length) {
245     this.length = length;
246     return this;
247   }
248 
setCachingStrategy( CachingStrategy cachingStrategy)249   public BlockReaderFactory setCachingStrategy(
250       CachingStrategy cachingStrategy) {
251     this.cachingStrategy = cachingStrategy;
252     return this;
253   }
254 
setInetSocketAddress( InetSocketAddress inetSocketAddress)255   public BlockReaderFactory setInetSocketAddress (
256       InetSocketAddress inetSocketAddress) {
257     this.inetSocketAddress = inetSocketAddress;
258     return this;
259   }
260 
setUserGroupInformation( UserGroupInformation userGroupInformation)261   public BlockReaderFactory setUserGroupInformation(
262       UserGroupInformation userGroupInformation) {
263     this.userGroupInformation = userGroupInformation;
264     return this;
265   }
266 
setRemotePeerFactory( RemotePeerFactory remotePeerFactory)267   public BlockReaderFactory setRemotePeerFactory(
268       RemotePeerFactory remotePeerFactory) {
269     this.remotePeerFactory = remotePeerFactory;
270     return this;
271   }
272 
setConfiguration( Configuration configuration)273   public BlockReaderFactory setConfiguration(
274       Configuration configuration) {
275     this.configuration = configuration;
276     return this;
277   }
278 
279   /**
280    * Build a BlockReader with the given options.
281    *
282    * This function will do the best it can to create a block reader that meets
283    * all of our requirements.  We prefer short-circuit block readers
284    * (BlockReaderLocal and BlockReaderLocalLegacy) over remote ones, since the
285    * former avoid the overhead of socket communication.  If short-circuit is
286    * unavailable, our next fallback is data transfer over UNIX domain sockets,
287    * if dfs.client.domain.socket.data.traffic has been enabled.  If that doesn't
288    * work, we will try to create a remote block reader that operates over TCP
289    * sockets.
290    *
291    * There are a few caches that are important here.
292    *
293    * The ShortCircuitCache stores file descriptor objects which have been passed
294    * from the DataNode.
295    *
296    * The DomainSocketFactory stores information about UNIX domain socket paths
297    * that we not been able to use in the past, so that we don't waste time
298    * retrying them over and over.  (Like all the caches, it does have a timeout,
299    * though.)
300    *
301    * The PeerCache stores peers that we have used in the past.  If we can reuse
302    * one of these peers, we avoid the overhead of re-opening a socket.  However,
303    * if the socket has been timed out on the remote end, our attempt to reuse
304    * the socket may end with an IOException.  For that reason, we limit our
305    * attempts at socket reuse to dfs.client.cached.conn.retry times.  After
306    * that, we create new sockets.  This avoids the problem where a thread tries
307    * to talk to a peer that it hasn't talked to in a while, and has to clean out
308    * every entry in a socket cache full of stale entries.
309    *
310    * @return The new BlockReader.  We will not return null.
311    *
312    * @throws InvalidToken
313    *             If the block token was invalid.
314    *         InvalidEncryptionKeyException
315    *             If the encryption key was invalid.
316    *         Other IOException
317    *             If there was another problem.
318    */
build()319   public BlockReader build() throws IOException {
320     BlockReader reader = null;
321 
322     Preconditions.checkNotNull(configuration);
323     if (conf.shortCircuitLocalReads && allowShortCircuitLocalReads) {
324       if (clientContext.getUseLegacyBlockReaderLocal()) {
325         reader = getLegacyBlockReaderLocal();
326         if (reader != null) {
327           if (LOG.isTraceEnabled()) {
328             LOG.trace(this + ": returning new legacy block reader local.");
329           }
330           return reader;
331         }
332       } else {
333         reader = getBlockReaderLocal();
334         if (reader != null) {
335           if (LOG.isTraceEnabled()) {
336             LOG.trace(this + ": returning new block reader local.");
337           }
338           return reader;
339         }
340       }
341     }
342     if (conf.domainSocketDataTraffic) {
343       reader = getRemoteBlockReaderFromDomain();
344       if (reader != null) {
345         if (LOG.isTraceEnabled()) {
346           LOG.trace(this + ": returning new remote block reader using " +
347               "UNIX domain socket on " + pathInfo.getPath());
348         }
349         return reader;
350       }
351     }
352     Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
353         "TCP reads were disabled for testing, but we failed to " +
354         "do a non-TCP read.");
355     return getRemoteBlockReaderFromTcp();
356   }
357 
358   /**
359    * Get {@link BlockReaderLocalLegacy} for short circuited local reads.
360    * This block reader implements the path-based style of local reads
361    * first introduced in HDFS-2246.
362    */
getLegacyBlockReaderLocal()363   private BlockReader getLegacyBlockReaderLocal() throws IOException {
364     if (LOG.isTraceEnabled()) {
365       LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
366     }
367     if (!DFSClient.isLocalAddress(inetSocketAddress)) {
368       if (LOG.isTraceEnabled()) {
369         LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
370             "the address " + inetSocketAddress + " is not local");
371       }
372       return null;
373     }
374     if (clientContext.getDisableLegacyBlockReaderLocal()) {
375       PerformanceAdvisory.LOG.debug(this + ": can't construct " +
376           "BlockReaderLocalLegacy because " +
377           "disableLegacyBlockReaderLocal is set.");
378       return null;
379     }
380     IOException ioe = null;
381     try {
382       return BlockReaderLocalLegacy.newBlockReader(conf,
383           userGroupInformation, configuration, fileName, block, token,
384           datanode, startOffset, length, storageType);
385     } catch (RemoteException remoteException) {
386       ioe = remoteException.unwrapRemoteException(
387                 InvalidToken.class, AccessControlException.class);
388     } catch (IOException e) {
389       ioe = e;
390     }
391     if ((!(ioe instanceof AccessControlException)) &&
392         isSecurityException(ioe)) {
393       // Handle security exceptions.
394       // We do not handle AccessControlException here, since
395       // BlockReaderLocalLegacy#newBlockReader uses that exception to indicate
396       // that the user is not in dfs.block.local-path-access.user, a condition
397       // which requires us to disable legacy SCR.
398       throw ioe;
399     }
400     LOG.warn(this + ": error creating legacy BlockReaderLocal.  " +
401         "Disabling legacy local reads.", ioe);
402     clientContext.setDisableLegacyBlockReaderLocal();
403     return null;
404   }
405 
getBlockReaderLocal()406   private BlockReader getBlockReaderLocal() throws InvalidToken {
407     if (LOG.isTraceEnabled()) {
408       LOG.trace(this + ": trying to construct a BlockReaderLocal " +
409           "for short-circuit reads.");
410     }
411     if (pathInfo == null) {
412       pathInfo = clientContext.getDomainSocketFactory().
413                       getPathInfo(inetSocketAddress, conf);
414     }
415     if (!pathInfo.getPathState().getUsableForShortCircuit()) {
416       PerformanceAdvisory.LOG.debug(this + ": " + pathInfo + " is not " +
417           "usable for short circuit; giving up on BlockReaderLocal.");
418       return null;
419     }
420     ShortCircuitCache cache = clientContext.getShortCircuitCache();
421     ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
422     ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
423     InvalidToken exc = info.getInvalidTokenException();
424     if (exc != null) {
425       if (LOG.isTraceEnabled()) {
426         LOG.trace(this + ": got InvalidToken exception while trying to " +
427             "construct BlockReaderLocal via " + pathInfo.getPath());
428       }
429       throw exc;
430     }
431     if (info.getReplica() == null) {
432       if (LOG.isTraceEnabled()) {
433         PerformanceAdvisory.LOG.debug(this + ": failed to get " +
434             "ShortCircuitReplica. Cannot construct " +
435             "BlockReaderLocal via " + pathInfo.getPath());
436       }
437       return null;
438     }
439     return new BlockReaderLocal.Builder(conf).
440         setFilename(fileName).
441         setBlock(block).
442         setStartOffset(startOffset).
443         setShortCircuitReplica(info.getReplica()).
444         setVerifyChecksum(verifyChecksum).
445         setCachingStrategy(cachingStrategy).
446         setStorageType(storageType).
447         build();
448   }
449 
450   /**
451    * Fetch a pair of short-circuit block descriptors from a local DataNode.
452    *
453    * @return    Null if we could not communicate with the datanode,
454    *            a new ShortCircuitReplicaInfo object otherwise.
455    *            ShortCircuitReplicaInfo objects may contain either an InvalidToken
456    *            exception, or a ShortCircuitReplica object ready to use.
457    */
458   @Override
createShortCircuitReplicaInfo()459   public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
460     if (createShortCircuitReplicaInfoCallback != null) {
461       ShortCircuitReplicaInfo info =
462         createShortCircuitReplicaInfoCallback.createShortCircuitReplicaInfo();
463       if (info != null) return info;
464     }
465     if (LOG.isTraceEnabled()) {
466       LOG.trace(this + ": trying to create ShortCircuitReplicaInfo.");
467     }
468     BlockReaderPeer curPeer;
469     while (true) {
470       curPeer = nextDomainPeer();
471       if (curPeer == null) break;
472       if (curPeer.fromCache) remainingCacheTries--;
473       DomainPeer peer = (DomainPeer)curPeer.peer;
474       Slot slot = null;
475       ShortCircuitCache cache = clientContext.getShortCircuitCache();
476       try {
477         MutableBoolean usedPeer = new MutableBoolean(false);
478         slot = cache.allocShmSlot(datanode, peer, usedPeer,
479             new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()),
480             clientName);
481         if (usedPeer.booleanValue()) {
482           if (LOG.isTraceEnabled()) {
483             LOG.trace(this + ": allocShmSlot used up our previous socket " +
484               peer.getDomainSocket() + ".  Allocating a new one...");
485           }
486           curPeer = nextDomainPeer();
487           if (curPeer == null) break;
488           peer = (DomainPeer)curPeer.peer;
489         }
490         ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot);
491         clientContext.getPeerCache().put(datanode, peer);
492         return info;
493       } catch (IOException e) {
494         if (slot != null) {
495           cache.freeSlot(slot);
496         }
497         if (curPeer.fromCache) {
498           // Handle an I/O error we got when using a cached socket.
499           // These are considered less serious, because the socket may be stale.
500           if (LOG.isDebugEnabled()) {
501             LOG.debug(this + ": closing stale domain peer " + peer, e);
502           }
503           IOUtils.cleanup(LOG, peer);
504         } else {
505           // Handle an I/O error we got when using a newly created socket.
506           // We temporarily disable the domain socket path for a few minutes in
507           // this case, to prevent wasting more time on it.
508           LOG.warn(this + ": I/O error requesting file descriptors.  " +
509               "Disabling domain socket " + peer.getDomainSocket(), e);
510           IOUtils.cleanup(LOG, peer);
511           clientContext.getDomainSocketFactory()
512               .disableDomainSocketPath(pathInfo.getPath());
513           return null;
514         }
515       }
516     }
517     return null;
518   }
519 
520   /**
521    * Request file descriptors from a DomainPeer.
522    *
523    * @param peer   The peer to use for communication.
524    * @param slot   If non-null, the shared memory slot to associate with the
525    *               new ShortCircuitReplica.
526    *
527    * @return  A ShortCircuitReplica object if we could communicate with the
528    *          datanode; null, otherwise.
529    * @throws  IOException If we encountered an I/O exception while communicating
530    *          with the datanode.
531    */
requestFileDescriptors(DomainPeer peer, Slot slot)532   private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
533           Slot slot) throws IOException {
534     ShortCircuitCache cache = clientContext.getShortCircuitCache();
535     final DataOutputStream out =
536         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
537     SlotId slotId = slot == null ? null : slot.getSlotId();
538     new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
539         failureInjector.getSupportsReceiptVerification());
540     DataInputStream in = new DataInputStream(peer.getInputStream());
541     BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
542         PBHelper.vintPrefixed(in));
543     DomainSocket sock = peer.getDomainSocket();
544     failureInjector.injectRequestFileDescriptorsFailure();
545     switch (resp.getStatus()) {
546     case SUCCESS:
547       byte buf[] = new byte[1];
548       FileInputStream fis[] = new FileInputStream[2];
549       sock.recvFileInputStreams(fis, buf, 0, buf.length);
550       ShortCircuitReplica replica = null;
551       try {
552         ExtendedBlockId key =
553             new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
554         if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
555           LOG.trace("Sending receipt verification byte for slot " + slot);
556           sock.getOutputStream().write(0);
557         }
558         replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
559             Time.monotonicNow(), slot);
560         return new ShortCircuitReplicaInfo(replica);
561       } catch (IOException e) {
562         // This indicates an error reading from disk, or a format error.  Since
563         // it's not a socket communication problem, we return null rather than
564         // throwing an exception.
565         LOG.warn(this + ": error creating ShortCircuitReplica.", e);
566         return null;
567       } finally {
568         if (replica == null) {
569           IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
570         }
571       }
572     case ERROR_UNSUPPORTED:
573       if (!resp.hasShortCircuitAccessVersion()) {
574         LOG.warn("short-circuit read access is disabled for " +
575             "DataNode " + datanode + ".  reason: " + resp.getMessage());
576         clientContext.getDomainSocketFactory()
577             .disableShortCircuitForPath(pathInfo.getPath());
578       } else {
579         LOG.warn("short-circuit read access for the file " +
580             fileName + " is disabled for DataNode " + datanode +
581             ".  reason: " + resp.getMessage());
582       }
583       return null;
584     case ERROR_ACCESS_TOKEN:
585       String msg = "access control error while " +
586           "attempting to set up short-circuit access to " +
587           fileName + resp.getMessage();
588       if (LOG.isDebugEnabled()) {
589         LOG.debug(this + ":" + msg);
590       }
591       return new ShortCircuitReplicaInfo(new InvalidToken(msg));
592     default:
593       LOG.warn(this + ": unknown response code " + resp.getStatus() +
594           " while attempting to set up short-circuit access. " +
595           resp.getMessage());
596       clientContext.getDomainSocketFactory()
597           .disableShortCircuitForPath(pathInfo.getPath());
598       return null;
599     }
600   }
601 
602   /**
603    * Get a RemoteBlockReader that communicates over a UNIX domain socket.
604    *
605    * @return The new BlockReader, or null if we failed to create the block
606    * reader.
607    *
608    * @throws InvalidToken    If the block token was invalid.
609    * Potentially other security-related execptions.
610    */
getRemoteBlockReaderFromDomain()611   private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
612     if (pathInfo == null) {
613       pathInfo = clientContext.getDomainSocketFactory().
614                       getPathInfo(inetSocketAddress, conf);
615     }
616     if (!pathInfo.getPathState().getUsableForDataTransfer()) {
617       PerformanceAdvisory.LOG.debug(this + ": not trying to create a " +
618           "remote block reader because the UNIX domain socket at " +
619           pathInfo + " is not usable.");
620       return null;
621     }
622     if (LOG.isTraceEnabled()) {
623       LOG.trace(this + ": trying to create a remote block reader from the " +
624           "UNIX domain socket at " + pathInfo.getPath());
625     }
626 
627     while (true) {
628       BlockReaderPeer curPeer = nextDomainPeer();
629       if (curPeer == null) break;
630       if (curPeer.fromCache) remainingCacheTries--;
631       DomainPeer peer = (DomainPeer)curPeer.peer;
632       BlockReader blockReader = null;
633       try {
634         blockReader = getRemoteBlockReader(peer);
635         return blockReader;
636       } catch (IOException ioe) {
637         IOUtils.cleanup(LOG, peer);
638         if (isSecurityException(ioe)) {
639           if (LOG.isTraceEnabled()) {
640             LOG.trace(this + ": got security exception while constructing " +
641                 "a remote block reader from the unix domain socket at " +
642                 pathInfo.getPath(), ioe);
643           }
644           throw ioe;
645         }
646         if (curPeer.fromCache) {
647           // Handle an I/O error we got when using a cached peer.  These are
648           // considered less serious, because the underlying socket may be stale.
649           if (LOG.isDebugEnabled()) {
650             LOG.debug("Closed potentially stale domain peer " + peer, ioe);
651           }
652         } else {
653           // Handle an I/O error we got when using a newly created domain peer.
654           // We temporarily disable the domain socket path for a few minutes in
655           // this case, to prevent wasting more time on it.
656           LOG.warn("I/O error constructing remote block reader.  Disabling " +
657               "domain socket " + peer.getDomainSocket(), ioe);
658           clientContext.getDomainSocketFactory()
659               .disableDomainSocketPath(pathInfo.getPath());
660           return null;
661         }
662       } finally {
663         if (blockReader == null) {
664           IOUtils.cleanup(LOG, peer);
665         }
666       }
667     }
668     return null;
669   }
670 
671   /**
672    * Get a RemoteBlockReader that communicates over a TCP socket.
673    *
674    * @return The new BlockReader.  We will not return null, but instead throw
675    *         an exception if this fails.
676    *
677    * @throws InvalidToken
678    *             If the block token was invalid.
679    *         InvalidEncryptionKeyException
680    *             If the encryption key was invalid.
681    *         Other IOException
682    *             If there was another problem.
683    */
getRemoteBlockReaderFromTcp()684   private BlockReader getRemoteBlockReaderFromTcp() throws IOException {
685     if (LOG.isTraceEnabled()) {
686       LOG.trace(this + ": trying to create a remote block reader from a " +
687           "TCP socket");
688     }
689     BlockReader blockReader = null;
690     while (true) {
691       BlockReaderPeer curPeer = null;
692       Peer peer = null;
693       try {
694         curPeer = nextTcpPeer();
695         if (curPeer.fromCache) remainingCacheTries--;
696         peer = curPeer.peer;
697         blockReader = getRemoteBlockReader(peer);
698         return blockReader;
699       } catch (IOException ioe) {
700         if (isSecurityException(ioe)) {
701           if (LOG.isTraceEnabled()) {
702             LOG.trace(this + ": got security exception while constructing " +
703                 "a remote block reader from " + peer, ioe);
704           }
705           throw ioe;
706         }
707         if ((curPeer != null) && curPeer.fromCache) {
708           // Handle an I/O error we got when using a cached peer.  These are
709           // considered less serious, because the underlying socket may be
710           // stale.
711           if (LOG.isDebugEnabled()) {
712             LOG.debug("Closed potentially stale remote peer " + peer, ioe);
713           }
714         } else {
715           // Handle an I/O error we got when using a newly created peer.
716           LOG.warn("I/O error constructing remote block reader.", ioe);
717           throw ioe;
718         }
719       } finally {
720         if (blockReader == null) {
721           IOUtils.cleanup(LOG, peer);
722         }
723       }
724     }
725   }
726 
727   public static class BlockReaderPeer {
728     final Peer peer;
729     final boolean fromCache;
730 
BlockReaderPeer(Peer peer, boolean fromCache)731     BlockReaderPeer(Peer peer, boolean fromCache) {
732       this.peer = peer;
733       this.fromCache = fromCache;
734     }
735   }
736 
737   /**
738    * Get the next DomainPeer-- either from the cache or by creating it.
739    *
740    * @return the next DomainPeer, or null if we could not construct one.
741    */
nextDomainPeer()742   private BlockReaderPeer nextDomainPeer() {
743     if (remainingCacheTries > 0) {
744       Peer peer = clientContext.getPeerCache().get(datanode, true);
745       if (peer != null) {
746         if (LOG.isTraceEnabled()) {
747           LOG.trace("nextDomainPeer: reusing existing peer " + peer);
748         }
749         return new BlockReaderPeer(peer, true);
750       }
751     }
752     DomainSocket sock = clientContext.getDomainSocketFactory().
753         createSocket(pathInfo, conf.socketTimeout);
754     if (sock == null) return null;
755     return new BlockReaderPeer(new DomainPeer(sock), false);
756   }
757 
758   /**
759    * Get the next TCP-based peer-- either from the cache or by creating it.
760    *
761    * @return the next Peer, or null if we could not construct one.
762    *
763    * @throws IOException  If there was an error while constructing the peer
764    *                      (such as an InvalidEncryptionKeyException)
765    */
nextTcpPeer()766   private BlockReaderPeer nextTcpPeer() throws IOException {
767     if (remainingCacheTries > 0) {
768       Peer peer = clientContext.getPeerCache().get(datanode, false);
769       if (peer != null) {
770         if (LOG.isTraceEnabled()) {
771           LOG.trace("nextTcpPeer: reusing existing peer " + peer);
772         }
773         return new BlockReaderPeer(peer, true);
774       }
775     }
776     try {
777       Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
778         datanode);
779       if (LOG.isTraceEnabled()) {
780         LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
781       }
782       return new BlockReaderPeer(peer, false);
783     } catch (IOException e) {
784       if (LOG.isTraceEnabled()) {
785         LOG.trace("nextTcpPeer: failed to create newConnectedPeer " +
786                   "connected to " + datanode);
787       }
788       throw e;
789     }
790   }
791 
792   /**
793    * Determine if an exception is security-related.
794    *
795    * We need to handle these exceptions differently than other IOExceptions.
796    * They don't indicate a communication problem.  Instead, they mean that there
797    * is some action the client needs to take, such as refetching block tokens,
798    * renewing encryption keys, etc.
799    *
800    * @param ioe    The exception
801    * @return       True only if the exception is security-related.
802    */
isSecurityException(IOException ioe)803   private static boolean isSecurityException(IOException ioe) {
804     return (ioe instanceof InvalidToken) ||
805             (ioe instanceof InvalidEncryptionKeyException) ||
806             (ioe instanceof InvalidBlockTokenException) ||
807             (ioe instanceof AccessControlException);
808   }
809 
810   @SuppressWarnings("deprecation")
getRemoteBlockReader(Peer peer)811   private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
812     if (conf.useLegacyBlockReader) {
813       return RemoteBlockReader.newBlockReader(fileName,
814           block, token, startOffset, length, conf.ioBufferSize,
815           verifyChecksum, clientName, peer, datanode,
816           clientContext.getPeerCache(), cachingStrategy);
817     } else {
818       return RemoteBlockReader2.newBlockReader(
819           fileName, block, token, startOffset, length,
820           verifyChecksum, clientName, peer, datanode,
821           clientContext.getPeerCache(), cachingStrategy);
822     }
823   }
824 
825   @Override
toString()826   public String toString() {
827     return "BlockReaderFactory(fileName=" + fileName + ", block=" + block + ")";
828   }
829 
830   /**
831    * File name to print when accessing a block directly (from servlets)
832    * @param s Address of the block location
833    * @param poolId Block pool ID of the block
834    * @param blockId Block ID of the block
835    * @return string that has a file name for debug purposes
836    */
getFileName(final InetSocketAddress s, final String poolId, final long blockId)837   public static String getFileName(final InetSocketAddress s,
838       final String poolId, final long blockId) {
839     return s.toString() + ":" + poolId + ":" + blockId;
840   }
841 }
842