1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hdfs; 20 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.HashMap; 24 import java.util.List; 25 import java.util.Map; 26 import java.util.concurrent.Callable; 27 import java.util.concurrent.CancellationException; 28 import java.util.concurrent.ExecutionException; 29 import java.util.concurrent.ExecutorService; 30 import java.util.concurrent.Future; 31 import java.util.concurrent.ScheduledThreadPoolExecutor; 32 import java.util.concurrent.TimeUnit; 33 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 import org.apache.hadoop.classification.InterfaceAudience; 37 import org.apache.hadoop.classification.InterfaceStability; 38 import org.apache.hadoop.conf.Configuration; 39 import org.apache.hadoop.fs.BlockLocation; 40 import org.apache.hadoop.fs.BlockStorageLocation; 41 import org.apache.hadoop.fs.HdfsVolumeId; 42 import org.apache.hadoop.fs.VolumeId; 43 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; 44 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 45 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; 46 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 47 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 48 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; 49 import org.apache.hadoop.ipc.RPC; 50 import org.apache.hadoop.security.token.Token; 51 import org.apache.htrace.Sampler; 52 import org.apache.htrace.Span; 53 import org.apache.htrace.Trace; 54 import org.apache.htrace.TraceScope; 55 56 import com.google.common.collect.Lists; 57 import com.google.common.collect.Maps; 58 59 @InterfaceAudience.Private 60 @InterfaceStability.Unstable 61 class BlockStorageLocationUtil { 62 63 static final Log LOG = LogFactory 64 .getLog(BlockStorageLocationUtil.class); 65 66 /** 67 * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set 68 * of datanodes and blocks. The blocks must all correspond to the same 69 * block pool. 70 * 71 * @param datanodeBlocks 72 * Map of datanodes to block replicas at each datanode 73 * @return callables Used to query each datanode for location information on 74 * the block replicas at the datanode 75 */ createVolumeBlockLocationCallables( Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, int timeout, boolean connectToDnViaHostname, Span parent)76 private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables( 77 Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, 78 int timeout, boolean connectToDnViaHostname, Span parent) { 79 80 if (datanodeBlocks.isEmpty()) { 81 return Lists.newArrayList(); 82 } 83 84 // Construct the callables, one per datanode 85 List<VolumeBlockLocationCallable> callables = 86 new ArrayList<VolumeBlockLocationCallable>(); 87 for (Map.Entry<DatanodeInfo, List<LocatedBlock>> entry : datanodeBlocks 88 .entrySet()) { 89 // Construct RPC parameters 90 DatanodeInfo datanode = entry.getKey(); 91 List<LocatedBlock> locatedBlocks = entry.getValue(); 92 if (locatedBlocks.isEmpty()) { 93 continue; 94 } 95 96 // Ensure that the blocks all are from the same block pool. 97 String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId(); 98 for (LocatedBlock lb : locatedBlocks) { 99 if (!poolId.equals(lb.getBlock().getBlockPoolId())) { 100 throw new IllegalArgumentException( 101 "All blocks to be queried must be in the same block pool: " + 102 locatedBlocks.get(0).getBlock() + " and " + lb + 103 " are from different pools."); 104 } 105 } 106 107 long[] blockIds = new long[locatedBlocks.size()]; 108 int i = 0; 109 List<Token<BlockTokenIdentifier>> dnTokens = 110 new ArrayList<Token<BlockTokenIdentifier>>( 111 locatedBlocks.size()); 112 for (LocatedBlock b : locatedBlocks) { 113 blockIds[i++] = b.getBlock().getBlockId(); 114 dnTokens.add(b.getBlockToken()); 115 } 116 VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable( 117 conf, datanode, poolId, blockIds, dnTokens, timeout, 118 connectToDnViaHostname, parent); 119 callables.add(callable); 120 } 121 return callables; 122 } 123 124 /** 125 * Queries datanodes for the blocks specified in <code>datanodeBlocks</code>, 126 * making one RPC to each datanode. These RPCs are made in parallel using a 127 * threadpool. 128 * 129 * @param datanodeBlocks 130 * Map of datanodes to the blocks present on the DN 131 * @return metadatas Map of datanodes to block metadata of the DN 132 * @throws InvalidBlockTokenException 133 * if client does not have read access on a requested block 134 */ queryDatanodesForHdfsBlocksMetadata( Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, int poolsize, int timeoutMs, boolean connectToDnViaHostname)135 static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata( 136 Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, 137 int poolsize, int timeoutMs, boolean connectToDnViaHostname) 138 throws InvalidBlockTokenException { 139 140 List<VolumeBlockLocationCallable> callables = 141 createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs, 142 connectToDnViaHostname, Trace.currentSpan()); 143 144 // Use a thread pool to execute the Callables in parallel 145 List<Future<HdfsBlocksMetadata>> futures = 146 new ArrayList<Future<HdfsBlocksMetadata>>(); 147 ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize); 148 try { 149 futures = executor.invokeAll(callables, timeoutMs, 150 TimeUnit.MILLISECONDS); 151 } catch (InterruptedException e) { 152 // Swallow the exception here, because we can return partial results 153 } 154 executor.shutdown(); 155 156 Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = 157 Maps.newHashMapWithExpectedSize(datanodeBlocks.size()); 158 // Fill in metadatas with results from DN RPCs, where possible 159 for (int i = 0; i < futures.size(); i++) { 160 VolumeBlockLocationCallable callable = callables.get(i); 161 DatanodeInfo datanode = callable.getDatanodeInfo(); 162 Future<HdfsBlocksMetadata> future = futures.get(i); 163 try { 164 HdfsBlocksMetadata metadata = future.get(); 165 metadatas.put(callable.getDatanodeInfo(), metadata); 166 } catch (CancellationException e) { 167 LOG.info("Cancelled while waiting for datanode " 168 + datanode.getIpcAddr(false) + ": " + e.toString()); 169 } catch (ExecutionException e) { 170 Throwable t = e.getCause(); 171 if (t instanceof InvalidBlockTokenException) { 172 LOG.warn("Invalid access token when trying to retrieve " 173 + "information from datanode " + datanode.getIpcAddr(false)); 174 throw (InvalidBlockTokenException) t; 175 } 176 else if (t instanceof UnsupportedOperationException) { 177 LOG.info("Datanode " + datanode.getIpcAddr(false) + " does not support" 178 + " required #getHdfsBlocksMetadata() API"); 179 throw (UnsupportedOperationException) t; 180 } else { 181 LOG.info("Failed to query block locations on datanode " + 182 datanode.getIpcAddr(false) + ": " + t); 183 } 184 if (LOG.isDebugEnabled()) { 185 LOG.debug("Could not fetch information from datanode", t); 186 } 187 } catch (InterruptedException e) { 188 // Shouldn't happen, because invokeAll waits for all Futures to be ready 189 LOG.info("Interrupted while fetching HdfsBlocksMetadata"); 190 } 191 } 192 193 return metadatas; 194 } 195 196 /** 197 * Group the per-replica {@link VolumeId} info returned from 198 * {@link DFSClient#queryDatanodesForHdfsBlocksMetadata(Map)} to be 199 * associated 200 * with the corresponding {@link LocatedBlock}. 201 * 202 * @param blocks 203 * Original LocatedBlock array 204 * @param metadatas 205 * VolumeId information for the replicas on each datanode 206 * @return blockVolumeIds per-replica VolumeId information associated with the 207 * parent LocatedBlock 208 */ associateVolumeIdsWithBlocks( List<LocatedBlock> blocks, Map<DatanodeInfo, HdfsBlocksMetadata> metadatas)209 static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks( 210 List<LocatedBlock> blocks, 211 Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) { 212 213 // Initialize mapping of ExtendedBlock to LocatedBlock. 214 // Used to associate results from DN RPCs to the parent LocatedBlock 215 Map<Long, LocatedBlock> blockIdToLocBlock = 216 new HashMap<Long, LocatedBlock>(); 217 for (LocatedBlock b : blocks) { 218 blockIdToLocBlock.put(b.getBlock().getBlockId(), b); 219 } 220 221 // Initialize the mapping of blocks -> list of VolumeIds, one per replica 222 // This is filled out with real values from the DN RPCs 223 Map<LocatedBlock, List<VolumeId>> blockVolumeIds = 224 new HashMap<LocatedBlock, List<VolumeId>>(); 225 for (LocatedBlock b : blocks) { 226 ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length); 227 for (int i = 0; i < b.getLocations().length; i++) { 228 l.add(null); 229 } 230 blockVolumeIds.put(b, l); 231 } 232 233 // Iterate through the list of metadatas (one per datanode). 234 // For each metadata, if it's valid, insert its volume location information 235 // into the Map returned to the caller 236 for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) { 237 DatanodeInfo datanode = entry.getKey(); 238 HdfsBlocksMetadata metadata = entry.getValue(); 239 // Check if metadata is valid 240 if (metadata == null) { 241 continue; 242 } 243 long[] metaBlockIds = metadata.getBlockIds(); 244 List<byte[]> metaVolumeIds = metadata.getVolumeIds(); 245 List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes(); 246 // Add VolumeId for each replica in the HdfsBlocksMetadata 247 for (int j = 0; j < metaBlockIds.length; j++) { 248 int volumeIndex = metaVolumeIndexes.get(j); 249 long blockId = metaBlockIds[j]; 250 // Skip if block wasn't found, or not a valid index into metaVolumeIds 251 // Also skip if the DN responded with a block we didn't ask for 252 if (volumeIndex == Integer.MAX_VALUE 253 || volumeIndex >= metaVolumeIds.size() 254 || !blockIdToLocBlock.containsKey(blockId)) { 255 if (LOG.isDebugEnabled()) { 256 LOG.debug("No data for block " + blockId); 257 } 258 continue; 259 } 260 // Get the VolumeId by indexing into the list of VolumeIds 261 // provided by the datanode 262 byte[] volumeId = metaVolumeIds.get(volumeIndex); 263 HdfsVolumeId id = new HdfsVolumeId(volumeId); 264 // Find out which index we are in the LocatedBlock's replicas 265 LocatedBlock locBlock = blockIdToLocBlock.get(blockId); 266 DatanodeInfo[] dnInfos = locBlock.getLocations(); 267 int index = -1; 268 for (int k = 0; k < dnInfos.length; k++) { 269 if (dnInfos[k].equals(datanode)) { 270 index = k; 271 break; 272 } 273 } 274 if (index < 0) { 275 if (LOG.isDebugEnabled()) { 276 LOG.debug("Datanode responded with a block volume id we did" + 277 " not request, omitting."); 278 } 279 continue; 280 } 281 // Place VolumeId at the same index as the DN's index in the list of 282 // replicas 283 List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); 284 volumeIds.set(index, id); 285 } 286 } 287 return blockVolumeIds; 288 } 289 290 /** 291 * Helper method to combine a list of {@link LocatedBlock} with associated 292 * {@link VolumeId} information to form a list of {@link BlockStorageLocation} 293 * . 294 */ convertToVolumeBlockLocations( List<LocatedBlock> blocks, Map<LocatedBlock, List<VolumeId>> blockVolumeIds)295 static BlockStorageLocation[] convertToVolumeBlockLocations( 296 List<LocatedBlock> blocks, 297 Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException { 298 // Construct the final return value of VolumeBlockLocation[] 299 BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks); 300 List<BlockStorageLocation> volumeBlockLocs = 301 new ArrayList<BlockStorageLocation>(locations.length); 302 for (int i = 0; i < locations.length; i++) { 303 LocatedBlock locBlock = blocks.get(i); 304 List<VolumeId> volumeIds = blockVolumeIds.get(locBlock); 305 BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i], 306 volumeIds.toArray(new VolumeId[0])); 307 volumeBlockLocs.add(bsLoc); 308 } 309 return volumeBlockLocs.toArray(new BlockStorageLocation[] {}); 310 } 311 312 /** 313 * Callable that sets up an RPC proxy to a datanode and queries it for 314 * volume location information for a list of ExtendedBlocks. 315 */ 316 private static class VolumeBlockLocationCallable implements 317 Callable<HdfsBlocksMetadata> { 318 319 private final Configuration configuration; 320 private final int timeout; 321 private final DatanodeInfo datanode; 322 private final String poolId; 323 private final long[] blockIds; 324 private final List<Token<BlockTokenIdentifier>> dnTokens; 325 private final boolean connectToDnViaHostname; 326 private final Span parentSpan; 327 VolumeBlockLocationCallable(Configuration configuration, DatanodeInfo datanode, String poolId, long []blockIds, List<Token<BlockTokenIdentifier>> dnTokens, int timeout, boolean connectToDnViaHostname, Span parentSpan)328 VolumeBlockLocationCallable(Configuration configuration, 329 DatanodeInfo datanode, String poolId, long []blockIds, 330 List<Token<BlockTokenIdentifier>> dnTokens, int timeout, 331 boolean connectToDnViaHostname, Span parentSpan) { 332 this.configuration = configuration; 333 this.timeout = timeout; 334 this.datanode = datanode; 335 this.poolId = poolId; 336 this.blockIds = blockIds; 337 this.dnTokens = dnTokens; 338 this.connectToDnViaHostname = connectToDnViaHostname; 339 this.parentSpan = parentSpan; 340 } 341 getDatanodeInfo()342 public DatanodeInfo getDatanodeInfo() { 343 return datanode; 344 } 345 346 @Override call()347 public HdfsBlocksMetadata call() throws Exception { 348 HdfsBlocksMetadata metadata = null; 349 // Create the RPC proxy and make the RPC 350 ClientDatanodeProtocol cdp = null; 351 TraceScope scope = 352 Trace.startSpan("getHdfsBlocksMetadata", parentSpan); 353 try { 354 cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, 355 timeout, connectToDnViaHostname); 356 metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens); 357 } catch (IOException e) { 358 // Bubble this up to the caller, handle with the Future 359 throw e; 360 } finally { 361 scope.close(); 362 if (cdp != null) { 363 RPC.stopProxy(cdp); 364 } 365 } 366 return metadata; 367 } 368 } 369 } 370