1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hdfs;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.CancellationException;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.classification.InterfaceAudience;
37 import org.apache.hadoop.classification.InterfaceStability;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.BlockLocation;
40 import org.apache.hadoop.fs.BlockStorageLocation;
41 import org.apache.hadoop.fs.HdfsVolumeId;
42 import org.apache.hadoop.fs.VolumeId;
43 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
44 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
45 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
46 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
47 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
48 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
49 import org.apache.hadoop.ipc.RPC;
50 import org.apache.hadoop.security.token.Token;
51 import org.apache.htrace.Sampler;
52 import org.apache.htrace.Span;
53 import org.apache.htrace.Trace;
54 import org.apache.htrace.TraceScope;
55 
56 import com.google.common.collect.Lists;
57 import com.google.common.collect.Maps;
58 
59 @InterfaceAudience.Private
60 @InterfaceStability.Unstable
61 class BlockStorageLocationUtil {
62 
63   static final Log LOG = LogFactory
64       .getLog(BlockStorageLocationUtil.class);
65 
66   /**
67    * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
68    * of datanodes and blocks. The blocks must all correspond to the same
69    * block pool.
70    *
71    * @param datanodeBlocks
72    *          Map of datanodes to block replicas at each datanode
73    * @return callables Used to query each datanode for location information on
74    *         the block replicas at the datanode
75    */
createVolumeBlockLocationCallables( Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, int timeout, boolean connectToDnViaHostname, Span parent)76   private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
77       Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
78       int timeout, boolean connectToDnViaHostname, Span parent) {
79 
80     if (datanodeBlocks.isEmpty()) {
81       return Lists.newArrayList();
82     }
83 
84     // Construct the callables, one per datanode
85     List<VolumeBlockLocationCallable> callables =
86         new ArrayList<VolumeBlockLocationCallable>();
87     for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks
88         .entrySet()) {
89       // Construct RPC parameters
90       DatanodeInfo datanode = entry.getKey();
91       List<LocatedBlock> locatedBlocks = entry.getValue();
92       if (locatedBlocks.isEmpty()) {
93         continue;
94       }
95 
96       // Ensure that the blocks all are from the same block pool.
97       String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId();
98       for (LocatedBlock lb : locatedBlocks) {
99         if (!poolId.equals(lb.getBlock().getBlockPoolId())) {
100           throw new IllegalArgumentException(
101               "All blocks to be queried must be in the same block pool: " +
102               locatedBlocks.get(0).getBlock() + " and " + lb +
103               " are from different pools.");
104         }
105       }
106 
107       long[] blockIds = new long[locatedBlocks.size()];
108       int i = 0;
109       List<Token<BlockTokenIdentifier>> dnTokens =
110           new ArrayList<Token<BlockTokenIdentifier>>(
111           locatedBlocks.size());
112       for (LocatedBlock b : locatedBlocks) {
113         blockIds[i++] = b.getBlock().getBlockId();
114         dnTokens.add(b.getBlockToken());
115       }
116       VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
117           conf, datanode, poolId, blockIds, dnTokens, timeout,
118           connectToDnViaHostname, parent);
119       callables.add(callable);
120     }
121     return callables;
122   }
123 
124   /**
125    * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>,
126    * making one RPC to each datanode. These RPCs are made in parallel using a
127    * threadpool.
128    *
129    * @param datanodeBlocks
130    *          Map of datanodes to the blocks present on the DN
131    * @return metadatas Map of datanodes to block metadata of the DN
132    * @throws InvalidBlockTokenException
133    *           if client does not have read access on a requested block
134    */
queryDatanodesForHdfsBlocksMetadata( Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, int poolsize, int timeoutMs, boolean connectToDnViaHostname)135   static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
136       Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
137       int poolsize, int timeoutMs, boolean connectToDnViaHostname)
138         throws InvalidBlockTokenException {
139 
140     List<VolumeBlockLocationCallable> callables =
141         createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
142             connectToDnViaHostname, Trace.currentSpan());
143 
144     // Use a thread pool to execute the Callables in parallel
145     List<Future<HdfsBlocksMetadata>> futures =
146         new ArrayList<Future<HdfsBlocksMetadata>>();
147     ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
148     try {
149       futures = executor.invokeAll(callables, timeoutMs,
150           TimeUnit.MILLISECONDS);
151     } catch (InterruptedException e) {
152       // Swallow the exception here, because we can return partial results
153     }
154     executor.shutdown();
155 
156     Map<DatanodeInfo, HdfsBlocksMetadata> metadatas =
157         Maps.newHashMapWithExpectedSize(datanodeBlocks.size());
158     // Fill in metadatas with results from DN RPCs, where possible
159     for (int i = 0; i < futures.size(); i++) {
160       VolumeBlockLocationCallable callable = callables.get(i);
161       DatanodeInfo datanode = callable.getDatanodeInfo();
162       Future<HdfsBlocksMetadata> future = futures.get(i);
163       try {
164         HdfsBlocksMetadata metadata = future.get();
165         metadatas.put(callable.getDatanodeInfo(), metadata);
166       } catch (CancellationException e) {
167         LOG.info("Cancelled while waiting for datanode "
168             + datanode.getIpcAddr(false) + ": " + e.toString());
169       } catch (ExecutionException e) {
170         Throwable t = e.getCause();
171         if (t instanceof InvalidBlockTokenException) {
172           LOG.warn("Invalid access token when trying to retrieve "
173               + "information from datanode " + datanode.getIpcAddr(false));
174           throw (InvalidBlockTokenException) t;
175         }
176         else if (t instanceof UnsupportedOperationException) {
177           LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support"
178               + " required #getHdfsBlocksMetadata() API");
179           throw (UnsupportedOperationException) t;
180         } else {
181           LOG.info("Failed to query block locations on datanode " +
182               datanode.getIpcAddr(false) + ": " + t);
183         }
184         if (LOG.isDebugEnabled()) {
185           LOG.debug("Could not fetch information from datanode", t);
186         }
187       } catch (InterruptedException e) {
188         // Shouldn't happen, because invokeAll waits for all Futures to be ready
189         LOG.info("Interrupted while fetching HdfsBlocksMetadata");
190       }
191     }
192 
193     return metadatas;
194   }
195 
196   /**
197    * Group the per-replica {@link VolumeId} info returned from
198    * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be
199    * associated
200    * with the corresponding {@link LocatedBlock}.
201    *
202    * @param blocks
203    *          Original LocatedBlock array
204    * @param metadatas
205    *          VolumeId information for the replicas on each datanode
206    * @return blockVolumeIds per-replica VolumeId information associated with the
207    *         parent LocatedBlock
208    */
associateVolumeIdsWithBlocks( List<LocatedBlock> blocks, Map<DatanodeInfo, HdfsBlocksMetadata> metadatas)209   static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
210       List<LocatedBlock> blocks,
211       Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) {
212 
213     // Initialize mapping of ExtendedBlock to LocatedBlock.
214     // Used to associate results from DN RPCs to the parent LocatedBlock
215     Map<Long, LocatedBlock> blockIdToLocBlock =
216         new HashMap<Long, LocatedBlock>();
217     for (LocatedBlock b : blocks) {
218       blockIdToLocBlock.put(b.getBlock().getBlockId(), b);
219     }
220 
221     // Initialize the mapping of blocks -> list of VolumeIds, one per replica
222     // This is filled out with real values from the DN RPCs
223     Map<LocatedBlock, List<VolumeId>> blockVolumeIds =
224         new HashMap<LocatedBlock, List<VolumeId>>();
225     for (LocatedBlock b : blocks) {
226       ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
227       for (int i = 0; i < b.getLocations().length; i++) {
228         l.add(null);
229       }
230       blockVolumeIds.put(b, l);
231     }
232 
233     // Iterate through the list of metadatas (one per datanode).
234     // For each metadata, if it's valid, insert its volume location information
235     // into the Map returned to the caller
236     for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) {
237       DatanodeInfo datanode = entry.getKey();
238       HdfsBlocksMetadata metadata = entry.getValue();
239       // Check if metadata is valid
240       if (metadata == null) {
241         continue;
242       }
243       long[] metaBlockIds = metadata.getBlockIds();
244       List<byte[]> metaVolumeIds = metadata.getVolumeIds();
245       List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
246       // Add VolumeId for each replica in the HdfsBlocksMetadata
247       for (int j = 0; j < metaBlockIds.length; j++) {
248         int volumeIndex = metaVolumeIndexes.get(j);
249         long blockId = metaBlockIds[j];
250         // Skip if block wasn't found, or not a valid index into metaVolumeIds
251         // Also skip if the DN responded with a block we didn't ask for
252         if (volumeIndex == Integer.MAX_VALUE
253             || volumeIndex >= metaVolumeIds.size()
254             || !blockIdToLocBlock.containsKey(blockId)) {
255           if (LOG.isDebugEnabled()) {
256             LOG.debug("No data for block " + blockId);
257           }
258           continue;
259         }
260         // Get the VolumeId by indexing into the list of VolumeIds
261         // provided by the datanode
262         byte[] volumeId = metaVolumeIds.get(volumeIndex);
263         HdfsVolumeId id = new HdfsVolumeId(volumeId);
264         // Find out which index we are in the LocatedBlock's replicas
265         LocatedBlock locBlock = blockIdToLocBlock.get(blockId);
266         DatanodeInfo[] dnInfos = locBlock.getLocations();
267         int index = -1;
268         for (int k = 0; k < dnInfos.length; k++) {
269           if (dnInfos[k].equals(datanode)) {
270             index = k;
271             break;
272           }
273         }
274         if (index < 0) {
275           if (LOG.isDebugEnabled()) {
276             LOG.debug("Datanode responded with a block volume id we did" +
277                 " not request, omitting.");
278           }
279           continue;
280         }
281         // Place VolumeId at the same index as the DN's index in the list of
282         // replicas
283         List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
284         volumeIds.set(index, id);
285       }
286     }
287     return blockVolumeIds;
288   }
289 
290   /**
291    * Helper method to combine a list of {@link LocatedBlock} with associated
292    * {@link VolumeId} information to form a list of {@link BlockStorageLocation}
293    * .
294    */
convertToVolumeBlockLocations( List<LocatedBlock> blocks, Map<LocatedBlock, List<VolumeId>> blockVolumeIds)295   static BlockStorageLocation[] convertToVolumeBlockLocations(
296       List<LocatedBlock> blocks,
297       Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
298     // Construct the final return value of VolumeBlockLocation[]
299     BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
300     List<BlockStorageLocation> volumeBlockLocs =
301         new ArrayList<BlockStorageLocation>(locations.length);
302     for (int i = 0; i < locations.length; i++) {
303       LocatedBlock locBlock = blocks.get(i);
304       List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
305       BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i],
306           volumeIds.toArray(new VolumeId[0]));
307       volumeBlockLocs.add(bsLoc);
308     }
309     return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
310   }
311 
312   /**
313    * Callable that sets up an RPC proxy to a datanode and queries it for
314    * volume location information for a list of ExtendedBlocks.
315    */
316   private static class VolumeBlockLocationCallable implements
317     Callable<HdfsBlocksMetadata> {
318 
319     private final Configuration configuration;
320     private final int timeout;
321     private final DatanodeInfo datanode;
322     private final String poolId;
323     private final long[] blockIds;
324     private final List<Token<BlockTokenIdentifier>> dnTokens;
325     private final boolean connectToDnViaHostname;
326     private final Span parentSpan;
327 
VolumeBlockLocationCallable(Configuration configuration, DatanodeInfo datanode, String poolId, long []blockIds, List<Token<BlockTokenIdentifier>> dnTokens, int timeout, boolean connectToDnViaHostname, Span parentSpan)328     VolumeBlockLocationCallable(Configuration configuration,
329         DatanodeInfo datanode, String poolId, long []blockIds,
330         List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
331         boolean connectToDnViaHostname, Span parentSpan) {
332       this.configuration = configuration;
333       this.timeout = timeout;
334       this.datanode = datanode;
335       this.poolId = poolId;
336       this.blockIds = blockIds;
337       this.dnTokens = dnTokens;
338       this.connectToDnViaHostname = connectToDnViaHostname;
339       this.parentSpan = parentSpan;
340     }
341 
getDatanodeInfo()342     public DatanodeInfo getDatanodeInfo() {
343       return datanode;
344     }
345 
346     @Override
call()347     public HdfsBlocksMetadata call() throws Exception {
348       HdfsBlocksMetadata metadata = null;
349       // Create the RPC proxy and make the RPC
350       ClientDatanodeProtocol cdp = null;
351       TraceScope scope =
352           Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
353       try {
354         cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
355             timeout, connectToDnViaHostname);
356         metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
357       } catch (IOException e) {
358         // Bubble this up to the caller, handle with the Future
359         throw e;
360       } finally {
361         scope.close();
362         if (cdp != null) {
363           RPC.stopProxy(cdp);
364         }
365       }
366       return metadata;
367     }
368   }
369 }
370