1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.blockmanagement; 19 20 import static org.apache.hadoop.util.ExitUtil.terminate; 21 22 import java.io.Closeable; 23 import java.io.IOException; 24 import java.util.ArrayList; 25 import java.util.Collection; 26 import java.util.Date; 27 import java.util.Iterator; 28 import java.util.LinkedList; 29 import java.util.List; 30 import java.util.Random; 31 import java.util.TreeMap; 32 import java.util.concurrent.TimeUnit; 33 import java.util.concurrent.locks.Condition; 34 import java.util.concurrent.locks.ReentrantLock; 35 36 import org.apache.hadoop.classification.InterfaceAudience; 37 import org.apache.hadoop.fs.UnresolvedLinkException; 38 import org.apache.hadoop.hdfs.protocol.Block; 39 import org.apache.hadoop.hdfs.protocol.CacheDirective; 40 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; 41 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; 42 import org.apache.hadoop.hdfs.server.namenode.CacheManager; 43 import org.apache.hadoop.hdfs.server.namenode.CachePool; 44 import org.apache.hadoop.hdfs.server.namenode.CachedBlock; 45 import org.apache.hadoop.hdfs.server.namenode.FSDirectory; 46 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; 47 import org.apache.hadoop.hdfs.server.namenode.INode; 48 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory; 49 import org.apache.hadoop.hdfs.server.namenode.INodeFile; 50 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; 51 import org.apache.hadoop.hdfs.util.ReadOnlyList; 52 import org.apache.hadoop.util.GSet; 53 import org.apache.hadoop.util.Time; 54 import org.slf4j.Logger; 55 import org.slf4j.LoggerFactory; 56 57 import com.google.common.base.Preconditions; 58 ; 59 60 /** 61 * Scans the namesystem, scheduling blocks to be cached as appropriate. 62 * 63 * The CacheReplicationMonitor does a full scan when the NameNode first 64 * starts up, and at configurable intervals afterwards. 65 */ 66 @InterfaceAudience.LimitedPrivate({"HDFS"}) 67 public class CacheReplicationMonitor extends Thread implements Closeable { 68 69 private static final Logger LOG = 70 LoggerFactory.getLogger(CacheReplicationMonitor.class); 71 72 private final FSNamesystem namesystem; 73 74 private final BlockManager blockManager; 75 76 private final CacheManager cacheManager; 77 78 private final GSet<CachedBlock, CachedBlock> cachedBlocks; 79 80 /** 81 * Pseudorandom number source 82 */ 83 private static final Random random = new Random(); 84 85 /** 86 * The interval at which we scan the namesystem for caching changes. 87 */ 88 private final long intervalMs; 89 90 /** 91 * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and 92 * waiting for rescan operations. 93 */ 94 private final ReentrantLock lock; 95 96 /** 97 * Notifies the scan thread that an immediate rescan is needed. 98 */ 99 private final Condition doRescan; 100 101 /** 102 * Notifies waiting threads that a rescan has finished. 103 */ 104 private final Condition scanFinished; 105 106 /** 107 * The number of rescans completed. Used to wait for scans to finish. 108 * Protected by the CacheReplicationMonitor lock. 109 */ 110 private long completedScanCount = 0; 111 112 /** 113 * The scan we're currently performing, or -1 if no scan is in progress. 114 * Protected by the CacheReplicationMonitor lock. 115 */ 116 private long curScanCount = -1; 117 118 /** 119 * The number of rescans we need to complete. Protected by the CRM lock. 120 */ 121 private long neededScanCount = 0; 122 123 /** 124 * True if this monitor should terminate. Protected by the CRM lock. 125 */ 126 private boolean shutdown = false; 127 128 /** 129 * Mark status of the current scan. 130 */ 131 private boolean mark = false; 132 133 /** 134 * Cache directives found in the previous scan. 135 */ 136 private int scannedDirectives; 137 138 /** 139 * Blocks found in the previous scan. 140 */ 141 private long scannedBlocks; 142 CacheReplicationMonitor(FSNamesystem namesystem, CacheManager cacheManager, long intervalMs, ReentrantLock lock)143 public CacheReplicationMonitor(FSNamesystem namesystem, 144 CacheManager cacheManager, long intervalMs, ReentrantLock lock) { 145 this.namesystem = namesystem; 146 this.blockManager = namesystem.getBlockManager(); 147 this.cacheManager = cacheManager; 148 this.cachedBlocks = cacheManager.getCachedBlocks(); 149 this.intervalMs = intervalMs; 150 this.lock = lock; 151 this.doRescan = this.lock.newCondition(); 152 this.scanFinished = this.lock.newCondition(); 153 } 154 155 @Override run()156 public void run() { 157 long startTimeMs = 0; 158 Thread.currentThread().setName("CacheReplicationMonitor(" + 159 System.identityHashCode(this) + ")"); 160 LOG.info("Starting CacheReplicationMonitor with interval " + 161 intervalMs + " milliseconds"); 162 try { 163 long curTimeMs = Time.monotonicNow(); 164 while (true) { 165 lock.lock(); 166 try { 167 while (true) { 168 if (shutdown) { 169 LOG.debug("Shutting down CacheReplicationMonitor"); 170 return; 171 } 172 if (completedScanCount < neededScanCount) { 173 LOG.debug("Rescanning because of pending operations"); 174 break; 175 } 176 long delta = (startTimeMs + intervalMs) - curTimeMs; 177 if (delta <= 0) { 178 LOG.debug("Rescanning after {} milliseconds", (curTimeMs - startTimeMs)); 179 break; 180 } 181 doRescan.await(delta, TimeUnit.MILLISECONDS); 182 curTimeMs = Time.monotonicNow(); 183 } 184 } finally { 185 lock.unlock(); 186 } 187 startTimeMs = curTimeMs; 188 mark = !mark; 189 rescan(); 190 curTimeMs = Time.monotonicNow(); 191 // Update synchronization-related variables. 192 lock.lock(); 193 try { 194 completedScanCount = curScanCount; 195 curScanCount = -1; 196 scanFinished.signalAll(); 197 } finally { 198 lock.unlock(); 199 } 200 LOG.debug("Scanned {} directive(s) and {} block(s) in {} millisecond(s).", 201 scannedDirectives, scannedBlocks, (curTimeMs - startTimeMs)); 202 } 203 } catch (InterruptedException e) { 204 LOG.info("Shutting down CacheReplicationMonitor."); 205 return; 206 } catch (Throwable t) { 207 LOG.error("Thread exiting", t); 208 terminate(1, t); 209 } 210 } 211 212 /** 213 * Waits for a rescan to complete. This doesn't guarantee consistency with 214 * pending operations, only relative recency, since it will not force a new 215 * rescan if a rescan is already underway. 216 * <p> 217 * Note that this call will release the FSN lock, so operations before and 218 * after are not atomic. 219 */ waitForRescanIfNeeded()220 public void waitForRescanIfNeeded() { 221 Preconditions.checkArgument(!namesystem.hasWriteLock(), 222 "Must not hold the FSN write lock when waiting for a rescan."); 223 Preconditions.checkArgument(lock.isHeldByCurrentThread(), 224 "Must hold the CRM lock when waiting for a rescan."); 225 if (neededScanCount <= completedScanCount) { 226 return; 227 } 228 // If no scan is already ongoing, mark the CRM as dirty and kick 229 if (curScanCount < 0) { 230 doRescan.signal(); 231 } 232 // Wait until the scan finishes and the count advances 233 while ((!shutdown) && (completedScanCount < neededScanCount)) { 234 try { 235 scanFinished.await(); 236 } catch (InterruptedException e) { 237 LOG.warn("Interrupted while waiting for CacheReplicationMonitor" 238 + " rescan", e); 239 break; 240 } 241 } 242 } 243 244 /** 245 * Indicates to the CacheReplicationMonitor that there have been CacheManager 246 * changes that require a rescan. 247 */ setNeedsRescan()248 public void setNeedsRescan() { 249 Preconditions.checkArgument(lock.isHeldByCurrentThread(), 250 "Must hold the CRM lock when setting the needsRescan bit."); 251 if (curScanCount >= 0) { 252 // If there is a scan in progress, we need to wait for the scan after 253 // that. 254 neededScanCount = curScanCount + 1; 255 } else { 256 // If there is no scan in progress, we need to wait for the next scan. 257 neededScanCount = completedScanCount + 1; 258 } 259 } 260 261 /** 262 * Shut down the monitor thread. 263 */ 264 @Override close()265 public void close() throws IOException { 266 Preconditions.checkArgument(namesystem.hasWriteLock()); 267 lock.lock(); 268 try { 269 if (shutdown) return; 270 // Since we hold both the FSN write lock and the CRM lock here, 271 // we know that the CRM thread cannot be currently modifying 272 // the cache manager state while we're closing it. 273 // Since the CRM thread checks the value of 'shutdown' after waiting 274 // for a lock, we know that the thread will not modify the cache 275 // manager state after this point. 276 shutdown = true; 277 doRescan.signalAll(); 278 scanFinished.signalAll(); 279 } finally { 280 lock.unlock(); 281 } 282 } 283 rescan()284 private void rescan() throws InterruptedException { 285 scannedDirectives = 0; 286 scannedBlocks = 0; 287 try { 288 namesystem.writeLock(); 289 try { 290 lock.lock(); 291 if (shutdown) { 292 throw new InterruptedException("CacheReplicationMonitor was " + 293 "shut down."); 294 } 295 curScanCount = completedScanCount + 1; 296 } finally { 297 lock.unlock(); 298 } 299 300 resetStatistics(); 301 rescanCacheDirectives(); 302 rescanCachedBlockMap(); 303 blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime(); 304 } finally { 305 namesystem.writeUnlock(); 306 } 307 } 308 resetStatistics()309 private void resetStatistics() { 310 for (CachePool pool: cacheManager.getCachePools()) { 311 pool.resetStatistics(); 312 } 313 for (CacheDirective directive: cacheManager.getCacheDirectives()) { 314 directive.resetStatistics(); 315 } 316 } 317 318 /** 319 * Scan all CacheDirectives. Use the information to figure out 320 * what cache replication factor each block should have. 321 */ rescanCacheDirectives()322 private void rescanCacheDirectives() { 323 FSDirectory fsDir = namesystem.getFSDirectory(); 324 final long now = new Date().getTime(); 325 for (CacheDirective directive : cacheManager.getCacheDirectives()) { 326 scannedDirectives++; 327 // Skip processing this entry if it has expired 328 if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) { 329 LOG.debug("Directive {}: the directive expired at {} (now = {})", 330 directive.getId(), directive.getExpiryTime(), now); 331 continue; 332 } 333 String path = directive.getPath(); 334 INode node; 335 try { 336 node = fsDir.getINode(path); 337 } catch (UnresolvedLinkException e) { 338 // We don't cache through symlinks 339 LOG.debug("Directive {}: got UnresolvedLinkException while resolving " 340 + "path {}", directive.getId(), path 341 ); 342 continue; 343 } 344 if (node == null) { 345 LOG.debug("Directive {}: No inode found at {}", directive.getId(), 346 path); 347 } else if (node.isDirectory()) { 348 INodeDirectory dir = node.asDirectory(); 349 ReadOnlyList<INode> children = dir 350 .getChildrenList(Snapshot.CURRENT_STATE_ID); 351 for (INode child : children) { 352 if (child.isFile()) { 353 rescanFile(directive, child.asFile()); 354 } 355 } 356 } else if (node.isFile()) { 357 rescanFile(directive, node.asFile()); 358 } else { 359 LOG.debug("Directive {}: ignoring non-directive, non-file inode {} ", 360 directive.getId(), node); 361 } 362 } 363 } 364 365 /** 366 * Apply a CacheDirective to a file. 367 * 368 * @param directive The CacheDirective to apply. 369 * @param file The file. 370 */ rescanFile(CacheDirective directive, INodeFile file)371 private void rescanFile(CacheDirective directive, INodeFile file) { 372 BlockInfoContiguous[] blockInfos = file.getBlocks(); 373 374 // Increment the "needed" statistics 375 directive.addFilesNeeded(1); 376 // We don't cache UC blocks, don't add them to the total here 377 long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() * 378 directive.getReplication(); 379 directive.addBytesNeeded(neededTotal); 380 381 // The pool's bytesNeeded is incremented as we scan. If the demand 382 // thus far plus the demand of this file would exceed the pool's limit, 383 // do not cache this file. 384 CachePool pool = directive.getPool(); 385 if (pool.getBytesNeeded() > pool.getLimit()) { 386 LOG.debug("Directive {}: not scanning file {} because " + 387 "bytesNeeded for pool {} is {}, but the pool's limit is {}", 388 directive.getId(), 389 file.getFullPathName(), 390 pool.getPoolName(), 391 pool.getBytesNeeded(), 392 pool.getLimit()); 393 return; 394 } 395 396 long cachedTotal = 0; 397 for (BlockInfoContiguous blockInfo : blockInfos) { 398 if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) { 399 // We don't try to cache blocks that are under construction. 400 LOG.trace("Directive {}: can't cache block {} because it is in state " 401 + "{}, not COMPLETE.", directive.getId(), blockInfo, 402 blockInfo.getBlockUCState() 403 ); 404 continue; 405 } 406 Block block = new Block(blockInfo.getBlockId()); 407 CachedBlock ncblock = new CachedBlock(block.getBlockId(), 408 directive.getReplication(), mark); 409 CachedBlock ocblock = cachedBlocks.get(ncblock); 410 if (ocblock == null) { 411 cachedBlocks.put(ncblock); 412 ocblock = ncblock; 413 } else { 414 // Update bytesUsed using the current replication levels. 415 // Assumptions: we assume that all the blocks are the same length 416 // on each datanode. We can assume this because we're only caching 417 // blocks in state COMPLETE. 418 // Note that if two directives are caching the same block(s), they will 419 // both get them added to their bytesCached. 420 List<DatanodeDescriptor> cachedOn = 421 ocblock.getDatanodes(Type.CACHED); 422 long cachedByBlock = Math.min(cachedOn.size(), 423 directive.getReplication()) * blockInfo.getNumBytes(); 424 cachedTotal += cachedByBlock; 425 426 if ((mark != ocblock.getMark()) || 427 (ocblock.getReplication() < directive.getReplication())) { 428 // 429 // Overwrite the block's replication and mark in two cases: 430 // 431 // 1. If the mark on the CachedBlock is different from the mark for 432 // this scan, that means the block hasn't been updated during this 433 // scan, and we should overwrite whatever is there, since it is no 434 // longer valid. 435 // 436 // 2. If the replication in the CachedBlock is less than what the 437 // directive asks for, we want to increase the block's replication 438 // field to what the directive asks for. 439 // 440 ocblock.setReplicationAndMark(directive.getReplication(), mark); 441 } 442 } 443 LOG.trace("Directive {}: setting replication for block {} to {}", 444 directive.getId(), blockInfo, ocblock.getReplication()); 445 } 446 // Increment the "cached" statistics 447 directive.addBytesCached(cachedTotal); 448 if (cachedTotal == neededTotal) { 449 directive.addFilesCached(1); 450 } 451 LOG.debug("Directive {}: caching {}: {}/{} bytes", directive.getId(), 452 file.getFullPathName(), cachedTotal, neededTotal); 453 } 454 findReasonForNotCaching(CachedBlock cblock, BlockInfoContiguous blockInfo)455 private String findReasonForNotCaching(CachedBlock cblock, 456 BlockInfoContiguous blockInfo) { 457 if (blockInfo == null) { 458 // Somehow, a cache report with the block arrived, but the block 459 // reports from the DataNode haven't (yet?) described such a block. 460 // Alternately, the NameNode might have invalidated the block, but the 461 // DataNode hasn't caught up. In any case, we want to tell the DN 462 // to uncache this. 463 return "not tracked by the BlockManager"; 464 } else if (!blockInfo.isComplete()) { 465 // When a cached block changes state from complete to some other state 466 // on the DataNode (perhaps because of append), it will begin the 467 // uncaching process. However, the uncaching process is not 468 // instantaneous, especially if clients have pinned the block. So 469 // there may be a period of time when incomplete blocks remain cached 470 // on the DataNodes. 471 return "not complete"; 472 } else if (cblock.getReplication() == 0) { 473 // Since 0 is not a valid value for a cache directive's replication 474 // field, seeing a replication of 0 on a CacheBlock means that it 475 // has never been reached by any sweep. 476 return "not needed by any directives"; 477 } else if (cblock.getMark() != mark) { 478 // Although the block was needed in the past, we didn't reach it during 479 // the current sweep. Therefore, it doesn't need to be cached any more. 480 // Need to set the replication to 0 so it doesn't flip back to cached 481 // when the mark flips on the next scan 482 cblock.setReplicationAndMark((short)0, mark); 483 return "no longer needed by any directives"; 484 } 485 return null; 486 } 487 488 /** 489 * Scan through the cached block map. 490 * Any blocks which are under-replicated should be assigned new Datanodes. 491 * Blocks that are over-replicated should be removed from Datanodes. 492 */ rescanCachedBlockMap()493 private void rescanCachedBlockMap() { 494 for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator(); 495 cbIter.hasNext(); ) { 496 scannedBlocks++; 497 CachedBlock cblock = cbIter.next(); 498 List<DatanodeDescriptor> pendingCached = 499 cblock.getDatanodes(Type.PENDING_CACHED); 500 List<DatanodeDescriptor> cached = 501 cblock.getDatanodes(Type.CACHED); 502 List<DatanodeDescriptor> pendingUncached = 503 cblock.getDatanodes(Type.PENDING_UNCACHED); 504 // Remove nodes from PENDING_UNCACHED if they were actually uncached. 505 for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator(); 506 iter.hasNext(); ) { 507 DatanodeDescriptor datanode = iter.next(); 508 if (!cblock.isInList(datanode.getCached())) { 509 LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} " 510 + "because the DataNode uncached it.", cblock.getBlockId(), 511 datanode.getDatanodeUuid()); 512 datanode.getPendingUncached().remove(cblock); 513 iter.remove(); 514 } 515 } 516 BlockInfoContiguous blockInfo = blockManager. 517 getStoredBlock(new Block(cblock.getBlockId())); 518 String reason = findReasonForNotCaching(cblock, blockInfo); 519 int neededCached = 0; 520 if (reason != null) { 521 LOG.trace("Block {}: can't cache block because it is {}", 522 cblock.getBlockId(), reason); 523 } else { 524 neededCached = cblock.getReplication(); 525 } 526 int numCached = cached.size(); 527 if (numCached >= neededCached) { 528 // If we have enough replicas, drop all pending cached. 529 for (Iterator<DatanodeDescriptor> iter = pendingCached.iterator(); 530 iter.hasNext(); ) { 531 DatanodeDescriptor datanode = iter.next(); 532 datanode.getPendingCached().remove(cblock); 533 iter.remove(); 534 LOG.trace("Block {}: removing from PENDING_CACHED for node {}" 535 + "because we already have {} cached replicas and we only" + 536 " need {}", 537 cblock.getBlockId(), datanode.getDatanodeUuid(), numCached, 538 neededCached 539 ); 540 } 541 } 542 if (numCached < neededCached) { 543 // If we don't have enough replicas, drop all pending uncached. 544 for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator(); 545 iter.hasNext(); ) { 546 DatanodeDescriptor datanode = iter.next(); 547 datanode.getPendingUncached().remove(cblock); 548 iter.remove(); 549 LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} " 550 + "because we only have {} cached replicas and we need " + 551 "{}", cblock.getBlockId(), datanode.getDatanodeUuid(), 552 numCached, neededCached 553 ); 554 } 555 } 556 int neededUncached = numCached - 557 (pendingUncached.size() + neededCached); 558 if (neededUncached > 0) { 559 addNewPendingUncached(neededUncached, cblock, cached, 560 pendingUncached); 561 } else { 562 int additionalCachedNeeded = neededCached - 563 (numCached + pendingCached.size()); 564 if (additionalCachedNeeded > 0) { 565 addNewPendingCached(additionalCachedNeeded, cblock, cached, 566 pendingCached); 567 } 568 } 569 if ((neededCached == 0) && 570 pendingUncached.isEmpty() && 571 pendingCached.isEmpty()) { 572 // we have nothing more to do with this block. 573 LOG.trace("Block {}: removing from cachedBlocks, since neededCached " 574 + "== 0, and pendingUncached and pendingCached are empty.", 575 cblock.getBlockId() 576 ); 577 cbIter.remove(); 578 } 579 } 580 } 581 582 /** 583 * Add new entries to the PendingUncached list. 584 * 585 * @param neededUncached The number of replicas that need to be uncached. 586 * @param cachedBlock The block which needs to be uncached. 587 * @param cached A list of DataNodes currently caching the block. 588 * @param pendingUncached A list of DataNodes that will soon uncache the 589 * block. 590 */ addNewPendingUncached(int neededUncached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingUncached)591 private void addNewPendingUncached(int neededUncached, 592 CachedBlock cachedBlock, List<DatanodeDescriptor> cached, 593 List<DatanodeDescriptor> pendingUncached) { 594 // Figure out which replicas can be uncached. 595 LinkedList<DatanodeDescriptor> possibilities = 596 new LinkedList<DatanodeDescriptor>(); 597 for (DatanodeDescriptor datanode : cached) { 598 if (!pendingUncached.contains(datanode)) { 599 possibilities.add(datanode); 600 } 601 } 602 while (neededUncached > 0) { 603 if (possibilities.isEmpty()) { 604 LOG.warn("Logic error: we're trying to uncache more replicas than " + 605 "actually exist for " + cachedBlock); 606 return; 607 } 608 DatanodeDescriptor datanode = 609 possibilities.remove(random.nextInt(possibilities.size())); 610 pendingUncached.add(datanode); 611 boolean added = datanode.getPendingUncached().add(cachedBlock); 612 assert added; 613 neededUncached--; 614 } 615 } 616 617 /** 618 * Add new entries to the PendingCached list. 619 * 620 * @param neededCached The number of replicas that need to be cached. 621 * @param cachedBlock The block which needs to be cached. 622 * @param cached A list of DataNodes currently caching the block. 623 * @param pendingCached A list of DataNodes that will soon cache the 624 * block. 625 */ addNewPendingCached(final int neededCached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingCached)626 private void addNewPendingCached(final int neededCached, 627 CachedBlock cachedBlock, List<DatanodeDescriptor> cached, 628 List<DatanodeDescriptor> pendingCached) { 629 // To figure out which replicas can be cached, we consult the 630 // blocksMap. We don't want to try to cache a corrupt replica, though. 631 BlockInfoContiguous blockInfo = blockManager. 632 getStoredBlock(new Block(cachedBlock.getBlockId())); 633 if (blockInfo == null) { 634 LOG.debug("Block {}: can't add new cached replicas," + 635 " because there is no record of this block " + 636 "on the NameNode.", cachedBlock.getBlockId()); 637 return; 638 } 639 if (!blockInfo.isComplete()) { 640 LOG.debug("Block {}: can't cache this block, because it is not yet" 641 + " complete.", cachedBlock.getBlockId()); 642 return; 643 } 644 // Filter the list of replicas to only the valid targets 645 List<DatanodeDescriptor> possibilities = 646 new LinkedList<DatanodeDescriptor>(); 647 int numReplicas = blockInfo.getCapacity(); 648 Collection<DatanodeDescriptor> corrupt = 649 blockManager.getCorruptReplicas(blockInfo); 650 int outOfCapacity = 0; 651 for (int i = 0; i < numReplicas; i++) { 652 DatanodeDescriptor datanode = blockInfo.getDatanode(i); 653 if (datanode == null) { 654 continue; 655 } 656 if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { 657 continue; 658 } 659 if (corrupt != null && corrupt.contains(datanode)) { 660 continue; 661 } 662 if (pendingCached.contains(datanode) || cached.contains(datanode)) { 663 continue; 664 } 665 long pendingBytes = 0; 666 // Subtract pending cached blocks from effective capacity 667 Iterator<CachedBlock> it = datanode.getPendingCached().iterator(); 668 while (it.hasNext()) { 669 CachedBlock cBlock = it.next(); 670 BlockInfoContiguous info = 671 blockManager.getStoredBlock(new Block(cBlock.getBlockId())); 672 if (info != null) { 673 pendingBytes -= info.getNumBytes(); 674 } 675 } 676 it = datanode.getPendingUncached().iterator(); 677 // Add pending uncached blocks from effective capacity 678 while (it.hasNext()) { 679 CachedBlock cBlock = it.next(); 680 BlockInfoContiguous info = 681 blockManager.getStoredBlock(new Block(cBlock.getBlockId())); 682 if (info != null) { 683 pendingBytes += info.getNumBytes(); 684 } 685 } 686 long pendingCapacity = pendingBytes + datanode.getCacheRemaining(); 687 if (pendingCapacity < blockInfo.getNumBytes()) { 688 LOG.trace("Block {}: DataNode {} is not a valid possibility " + 689 "because the block has size {}, but the DataNode only has {}" + 690 "bytes of cache remaining ({} pending bytes, {} already cached.", 691 blockInfo.getBlockId(), datanode.getDatanodeUuid(), 692 blockInfo.getNumBytes(), pendingCapacity, pendingBytes, 693 datanode.getCacheRemaining()); 694 outOfCapacity++; 695 continue; 696 } 697 possibilities.add(datanode); 698 } 699 List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities, 700 neededCached, blockManager.getDatanodeManager().getStaleInterval()); 701 for (DatanodeDescriptor datanode : chosen) { 702 LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}", 703 blockInfo.getBlockId(), datanode.getDatanodeUuid()); 704 pendingCached.add(datanode); 705 boolean added = datanode.getPendingCached().add(cachedBlock); 706 assert added; 707 } 708 // We were unable to satisfy the requested replication factor 709 if (neededCached > chosen.size()) { 710 LOG.debug("Block {}: we only have {} of {} cached replicas." 711 + " {} DataNodes have insufficient cache capacity.", 712 blockInfo.getBlockId(), 713 (cachedBlock.getReplication() - neededCached + chosen.size()), 714 cachedBlock.getReplication(), outOfCapacity 715 ); 716 } 717 } 718 719 /** 720 * Chooses datanode locations for caching from a list of valid possibilities. 721 * Non-stale nodes are chosen before stale nodes. 722 * 723 * @param possibilities List of candidate datanodes 724 * @param neededCached Number of replicas needed 725 * @param staleInterval Age of a stale datanode 726 * @return A list of chosen datanodes 727 */ chooseDatanodesForCaching( final List<DatanodeDescriptor> possibilities, final int neededCached, final long staleInterval)728 private static List<DatanodeDescriptor> chooseDatanodesForCaching( 729 final List<DatanodeDescriptor> possibilities, final int neededCached, 730 final long staleInterval) { 731 // Make a copy that we can modify 732 List<DatanodeDescriptor> targets = 733 new ArrayList<DatanodeDescriptor>(possibilities); 734 // Selected targets 735 List<DatanodeDescriptor> chosen = new LinkedList<DatanodeDescriptor>(); 736 737 // Filter out stale datanodes 738 List<DatanodeDescriptor> stale = new LinkedList<DatanodeDescriptor>(); 739 Iterator<DatanodeDescriptor> it = targets.iterator(); 740 while (it.hasNext()) { 741 DatanodeDescriptor d = it.next(); 742 if (d.isStale(staleInterval)) { 743 it.remove(); 744 stale.add(d); 745 } 746 } 747 // Select targets 748 while (chosen.size() < neededCached) { 749 // Try to use stale nodes if we're out of non-stale nodes, else we're done 750 if (targets.isEmpty()) { 751 if (!stale.isEmpty()) { 752 targets = stale; 753 } else { 754 break; 755 } 756 } 757 // Select a random target 758 DatanodeDescriptor target = 759 chooseRandomDatanodeByRemainingCapacity(targets); 760 chosen.add(target); 761 targets.remove(target); 762 } 763 return chosen; 764 } 765 766 /** 767 * Choose a single datanode from the provided list of possible 768 * targets, weighted by the percentage of free space remaining on the node. 769 * 770 * @return The chosen datanode 771 */ chooseRandomDatanodeByRemainingCapacity( final List<DatanodeDescriptor> targets)772 private static DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity( 773 final List<DatanodeDescriptor> targets) { 774 // Use a weighted probability to choose the target datanode 775 float total = 0; 776 for (DatanodeDescriptor d : targets) { 777 total += d.getCacheRemainingPercent(); 778 } 779 // Give each datanode a portion of keyspace equal to its relative weight 780 // [0, w1) selects d1, [w1, w2) selects d2, etc. 781 TreeMap<Integer, DatanodeDescriptor> lottery = 782 new TreeMap<Integer, DatanodeDescriptor>(); 783 int offset = 0; 784 for (DatanodeDescriptor d : targets) { 785 // Since we're using floats, be paranoid about negative values 786 int weight = 787 Math.max(1, (int)((d.getCacheRemainingPercent() / total) * 1000000)); 788 offset += weight; 789 lottery.put(offset, d); 790 } 791 // Choose a number from [0, offset), which is the total amount of weight, 792 // to select the winner 793 DatanodeDescriptor winner = 794 lottery.higherEntry(random.nextInt(offset)).getValue(); 795 return winner; 796 } 797 } 798