1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.blockmanagement;
19 
20 import static org.apache.hadoop.util.ExitUtil.terminate;
21 
22 import java.io.Closeable;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Date;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Random;
31 import java.util.TreeMap;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.locks.Condition;
34 import java.util.concurrent.locks.ReentrantLock;
35 
36 import org.apache.hadoop.classification.InterfaceAudience;
37 import org.apache.hadoop.fs.UnresolvedLinkException;
38 import org.apache.hadoop.hdfs.protocol.Block;
39 import org.apache.hadoop.hdfs.protocol.CacheDirective;
40 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
41 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
42 import org.apache.hadoop.hdfs.server.namenode.CacheManager;
43 import org.apache.hadoop.hdfs.server.namenode.CachePool;
44 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
45 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
46 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
47 import org.apache.hadoop.hdfs.server.namenode.INode;
48 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
49 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
50 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
51 import org.apache.hadoop.hdfs.util.ReadOnlyList;
52 import org.apache.hadoop.util.GSet;
53 import org.apache.hadoop.util.Time;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 
57 import com.google.common.base.Preconditions;
58 ;
59 
60 /**
61  * Scans the namesystem, scheduling blocks to be cached as appropriate.
62  *
63  * The CacheReplicationMonitor does a full scan when the NameNode first
64  * starts up, and at configurable intervals afterwards.
65  */
66 @InterfaceAudience.LimitedPrivate({"HDFS"})
67 public class CacheReplicationMonitor extends Thread implements Closeable {
68 
69   private static final Logger LOG =
70       LoggerFactory.getLogger(CacheReplicationMonitor.class);
71 
72   private final FSNamesystem namesystem;
73 
74   private final BlockManager blockManager;
75 
76   private final CacheManager cacheManager;
77 
78   private final GSet<CachedBlock, CachedBlock> cachedBlocks;
79 
80   /**
81    * Pseudorandom number source
82    */
83   private static final Random random = new Random();
84 
85   /**
86    * The interval at which we scan the namesystem for caching changes.
87    */
88   private final long intervalMs;
89 
90   /**
91    * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and
92    * waiting for rescan operations.
93    */
94   private final ReentrantLock lock;
95 
96   /**
97    * Notifies the scan thread that an immediate rescan is needed.
98    */
99   private final Condition doRescan;
100 
101   /**
102    * Notifies waiting threads that a rescan has finished.
103    */
104   private final Condition scanFinished;
105 
106   /**
107    * The number of rescans completed. Used to wait for scans to finish.
108    * Protected by the CacheReplicationMonitor lock.
109    */
110   private long completedScanCount = 0;
111 
112   /**
113    * The scan we're currently performing, or -1 if no scan is in progress.
114    * Protected by the CacheReplicationMonitor lock.
115    */
116   private long curScanCount = -1;
117 
118   /**
119    * The number of rescans we need to complete.  Protected by the CRM lock.
120    */
121   private long neededScanCount = 0;
122 
123   /**
124    * True if this monitor should terminate. Protected by the CRM lock.
125    */
126   private boolean shutdown = false;
127 
128   /**
129    * Mark status of the current scan.
130    */
131   private boolean mark = false;
132 
133   /**
134    * Cache directives found in the previous scan.
135    */
136   private int scannedDirectives;
137 
138   /**
139    * Blocks found in the previous scan.
140    */
141   private long scannedBlocks;
142 
CacheReplicationMonitor(FSNamesystem namesystem, CacheManager cacheManager, long intervalMs, ReentrantLock lock)143   public CacheReplicationMonitor(FSNamesystem namesystem,
144       CacheManager cacheManager, long intervalMs, ReentrantLock lock) {
145     this.namesystem = namesystem;
146     this.blockManager = namesystem.getBlockManager();
147     this.cacheManager = cacheManager;
148     this.cachedBlocks = cacheManager.getCachedBlocks();
149     this.intervalMs = intervalMs;
150     this.lock = lock;
151     this.doRescan = this.lock.newCondition();
152     this.scanFinished = this.lock.newCondition();
153   }
154 
155   @Override
run()156   public void run() {
157     long startTimeMs = 0;
158     Thread.currentThread().setName("CacheReplicationMonitor(" +
159         System.identityHashCode(this) + ")");
160     LOG.info("Starting CacheReplicationMonitor with interval " +
161              intervalMs + " milliseconds");
162     try {
163       long curTimeMs = Time.monotonicNow();
164       while (true) {
165         lock.lock();
166         try {
167           while (true) {
168             if (shutdown) {
169               LOG.debug("Shutting down CacheReplicationMonitor");
170               return;
171             }
172             if (completedScanCount < neededScanCount) {
173               LOG.debug("Rescanning because of pending operations");
174               break;
175             }
176             long delta = (startTimeMs + intervalMs) - curTimeMs;
177             if (delta <= 0) {
178               LOG.debug("Rescanning after {} milliseconds", (curTimeMs - startTimeMs));
179               break;
180             }
181             doRescan.await(delta, TimeUnit.MILLISECONDS);
182             curTimeMs = Time.monotonicNow();
183           }
184         } finally {
185           lock.unlock();
186         }
187         startTimeMs = curTimeMs;
188         mark = !mark;
189         rescan();
190         curTimeMs = Time.monotonicNow();
191         // Update synchronization-related variables.
192         lock.lock();
193         try {
194           completedScanCount = curScanCount;
195           curScanCount = -1;
196           scanFinished.signalAll();
197         } finally {
198           lock.unlock();
199         }
200         LOG.debug("Scanned {} directive(s) and {} block(s) in {} millisecond(s).",
201             scannedDirectives, scannedBlocks, (curTimeMs - startTimeMs));
202       }
203     } catch (InterruptedException e) {
204       LOG.info("Shutting down CacheReplicationMonitor.");
205       return;
206     } catch (Throwable t) {
207       LOG.error("Thread exiting", t);
208       terminate(1, t);
209     }
210   }
211 
212   /**
213    * Waits for a rescan to complete. This doesn't guarantee consistency with
214    * pending operations, only relative recency, since it will not force a new
215    * rescan if a rescan is already underway.
216    * <p>
217    * Note that this call will release the FSN lock, so operations before and
218    * after are not atomic.
219    */
waitForRescanIfNeeded()220   public void waitForRescanIfNeeded() {
221     Preconditions.checkArgument(!namesystem.hasWriteLock(),
222         "Must not hold the FSN write lock when waiting for a rescan.");
223     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
224         "Must hold the CRM lock when waiting for a rescan.");
225     if (neededScanCount <= completedScanCount) {
226       return;
227     }
228     // If no scan is already ongoing, mark the CRM as dirty and kick
229     if (curScanCount < 0) {
230       doRescan.signal();
231     }
232     // Wait until the scan finishes and the count advances
233     while ((!shutdown) && (completedScanCount < neededScanCount)) {
234       try {
235         scanFinished.await();
236       } catch (InterruptedException e) {
237         LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
238             + " rescan", e);
239         break;
240       }
241     }
242   }
243 
244   /**
245    * Indicates to the CacheReplicationMonitor that there have been CacheManager
246    * changes that require a rescan.
247    */
setNeedsRescan()248   public void setNeedsRescan() {
249     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
250         "Must hold the CRM lock when setting the needsRescan bit.");
251     if (curScanCount >= 0) {
252       // If there is a scan in progress, we need to wait for the scan after
253       // that.
254       neededScanCount = curScanCount + 1;
255     } else {
256       // If there is no scan in progress, we need to wait for the next scan.
257       neededScanCount = completedScanCount + 1;
258     }
259   }
260 
261   /**
262    * Shut down the monitor thread.
263    */
264   @Override
close()265   public void close() throws IOException {
266     Preconditions.checkArgument(namesystem.hasWriteLock());
267     lock.lock();
268     try {
269       if (shutdown) return;
270       // Since we hold both the FSN write lock and the CRM lock here,
271       // we know that the CRM thread cannot be currently modifying
272       // the cache manager state while we're closing it.
273       // Since the CRM thread checks the value of 'shutdown' after waiting
274       // for a lock, we know that the thread will not modify the cache
275       // manager state after this point.
276       shutdown = true;
277       doRescan.signalAll();
278       scanFinished.signalAll();
279     } finally {
280       lock.unlock();
281     }
282   }
283 
rescan()284   private void rescan() throws InterruptedException {
285     scannedDirectives = 0;
286     scannedBlocks = 0;
287     try {
288       namesystem.writeLock();
289       try {
290         lock.lock();
291         if (shutdown) {
292           throw new InterruptedException("CacheReplicationMonitor was " +
293               "shut down.");
294         }
295         curScanCount = completedScanCount + 1;
296       } finally {
297         lock.unlock();
298       }
299 
300       resetStatistics();
301       rescanCacheDirectives();
302       rescanCachedBlockMap();
303       blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
304     } finally {
305       namesystem.writeUnlock();
306     }
307   }
308 
resetStatistics()309   private void resetStatistics() {
310     for (CachePool pool: cacheManager.getCachePools()) {
311       pool.resetStatistics();
312     }
313     for (CacheDirective directive: cacheManager.getCacheDirectives()) {
314       directive.resetStatistics();
315     }
316   }
317 
318   /**
319    * Scan all CacheDirectives.  Use the information to figure out
320    * what cache replication factor each block should have.
321    */
rescanCacheDirectives()322   private void rescanCacheDirectives() {
323     FSDirectory fsDir = namesystem.getFSDirectory();
324     final long now = new Date().getTime();
325     for (CacheDirective directive : cacheManager.getCacheDirectives()) {
326       scannedDirectives++;
327       // Skip processing this entry if it has expired
328       if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
329         LOG.debug("Directive {}: the directive expired at {} (now = {})",
330              directive.getId(), directive.getExpiryTime(), now);
331         continue;
332       }
333       String path = directive.getPath();
334       INode node;
335       try {
336         node = fsDir.getINode(path);
337       } catch (UnresolvedLinkException e) {
338         // We don't cache through symlinks
339         LOG.debug("Directive {}: got UnresolvedLinkException while resolving "
340                 + "path {}", directive.getId(), path
341         );
342         continue;
343       }
344       if (node == null)  {
345         LOG.debug("Directive {}: No inode found at {}", directive.getId(),
346             path);
347       } else if (node.isDirectory()) {
348         INodeDirectory dir = node.asDirectory();
349         ReadOnlyList<INode> children = dir
350             .getChildrenList(Snapshot.CURRENT_STATE_ID);
351         for (INode child : children) {
352           if (child.isFile()) {
353             rescanFile(directive, child.asFile());
354           }
355         }
356       } else if (node.isFile()) {
357         rescanFile(directive, node.asFile());
358       } else {
359         LOG.debug("Directive {}: ignoring non-directive, non-file inode {} ",
360             directive.getId(), node);
361       }
362     }
363   }
364 
365   /**
366    * Apply a CacheDirective to a file.
367    *
368    * @param directive The CacheDirective to apply.
369    * @param file The file.
370    */
rescanFile(CacheDirective directive, INodeFile file)371   private void rescanFile(CacheDirective directive, INodeFile file) {
372     BlockInfoContiguous[] blockInfos = file.getBlocks();
373 
374     // Increment the "needed" statistics
375     directive.addFilesNeeded(1);
376     // We don't cache UC blocks, don't add them to the total here
377     long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() *
378         directive.getReplication();
379     directive.addBytesNeeded(neededTotal);
380 
381     // The pool's bytesNeeded is incremented as we scan. If the demand
382     // thus far plus the demand of this file would exceed the pool's limit,
383     // do not cache this file.
384     CachePool pool = directive.getPool();
385     if (pool.getBytesNeeded() > pool.getLimit()) {
386       LOG.debug("Directive {}: not scanning file {} because " +
387           "bytesNeeded for pool {} is {}, but the pool's limit is {}",
388           directive.getId(),
389           file.getFullPathName(),
390           pool.getPoolName(),
391           pool.getBytesNeeded(),
392           pool.getLimit());
393       return;
394     }
395 
396     long cachedTotal = 0;
397     for (BlockInfoContiguous blockInfo : blockInfos) {
398       if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
399         // We don't try to cache blocks that are under construction.
400         LOG.trace("Directive {}: can't cache block {} because it is in state "
401                 + "{}, not COMPLETE.", directive.getId(), blockInfo,
402             blockInfo.getBlockUCState()
403         );
404         continue;
405       }
406       Block block = new Block(blockInfo.getBlockId());
407       CachedBlock ncblock = new CachedBlock(block.getBlockId(),
408           directive.getReplication(), mark);
409       CachedBlock ocblock = cachedBlocks.get(ncblock);
410       if (ocblock == null) {
411         cachedBlocks.put(ncblock);
412         ocblock = ncblock;
413       } else {
414         // Update bytesUsed using the current replication levels.
415         // Assumptions: we assume that all the blocks are the same length
416         // on each datanode.  We can assume this because we're only caching
417         // blocks in state COMPLETE.
418         // Note that if two directives are caching the same block(s), they will
419         // both get them added to their bytesCached.
420         List<DatanodeDescriptor> cachedOn =
421             ocblock.getDatanodes(Type.CACHED);
422         long cachedByBlock = Math.min(cachedOn.size(),
423             directive.getReplication()) * blockInfo.getNumBytes();
424         cachedTotal += cachedByBlock;
425 
426         if ((mark != ocblock.getMark()) ||
427             (ocblock.getReplication() < directive.getReplication())) {
428           //
429           // Overwrite the block's replication and mark in two cases:
430           //
431           // 1. If the mark on the CachedBlock is different from the mark for
432           // this scan, that means the block hasn't been updated during this
433           // scan, and we should overwrite whatever is there, since it is no
434           // longer valid.
435           //
436           // 2. If the replication in the CachedBlock is less than what the
437           // directive asks for, we want to increase the block's replication
438           // field to what the directive asks for.
439           //
440           ocblock.setReplicationAndMark(directive.getReplication(), mark);
441         }
442       }
443       LOG.trace("Directive {}: setting replication for block {} to {}",
444           directive.getId(), blockInfo, ocblock.getReplication());
445     }
446     // Increment the "cached" statistics
447     directive.addBytesCached(cachedTotal);
448     if (cachedTotal == neededTotal) {
449       directive.addFilesCached(1);
450     }
451     LOG.debug("Directive {}: caching {}: {}/{} bytes", directive.getId(),
452         file.getFullPathName(), cachedTotal, neededTotal);
453   }
454 
findReasonForNotCaching(CachedBlock cblock, BlockInfoContiguous blockInfo)455   private String findReasonForNotCaching(CachedBlock cblock,
456           BlockInfoContiguous blockInfo) {
457     if (blockInfo == null) {
458       // Somehow, a cache report with the block arrived, but the block
459       // reports from the DataNode haven't (yet?) described such a block.
460       // Alternately, the NameNode might have invalidated the block, but the
461       // DataNode hasn't caught up.  In any case, we want to tell the DN
462       // to uncache this.
463       return "not tracked by the BlockManager";
464     } else if (!blockInfo.isComplete()) {
465       // When a cached block changes state from complete to some other state
466       // on the DataNode (perhaps because of append), it will begin the
467       // uncaching process.  However, the uncaching process is not
468       // instantaneous, especially if clients have pinned the block.  So
469       // there may be a period of time when incomplete blocks remain cached
470       // on the DataNodes.
471       return "not complete";
472     } else if (cblock.getReplication() == 0) {
473       // Since 0 is not a valid value for a cache directive's replication
474       // field, seeing a replication of 0 on a CacheBlock means that it
475       // has never been reached by any sweep.
476       return "not needed by any directives";
477     } else if (cblock.getMark() != mark) {
478       // Although the block was needed in the past, we didn't reach it during
479       // the current sweep.  Therefore, it doesn't need to be cached any more.
480       // Need to set the replication to 0 so it doesn't flip back to cached
481       // when the mark flips on the next scan
482       cblock.setReplicationAndMark((short)0, mark);
483       return "no longer needed by any directives";
484     }
485     return null;
486   }
487 
488   /**
489    * Scan through the cached block map.
490    * Any blocks which are under-replicated should be assigned new Datanodes.
491    * Blocks that are over-replicated should be removed from Datanodes.
492    */
rescanCachedBlockMap()493   private void rescanCachedBlockMap() {
494     for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator();
495         cbIter.hasNext(); ) {
496       scannedBlocks++;
497       CachedBlock cblock = cbIter.next();
498       List<DatanodeDescriptor> pendingCached =
499           cblock.getDatanodes(Type.PENDING_CACHED);
500       List<DatanodeDescriptor> cached =
501           cblock.getDatanodes(Type.CACHED);
502       List<DatanodeDescriptor> pendingUncached =
503           cblock.getDatanodes(Type.PENDING_UNCACHED);
504       // Remove nodes from PENDING_UNCACHED if they were actually uncached.
505       for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
506           iter.hasNext(); ) {
507         DatanodeDescriptor datanode = iter.next();
508         if (!cblock.isInList(datanode.getCached())) {
509           LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
510               + "because the DataNode uncached it.", cblock.getBlockId(),
511               datanode.getDatanodeUuid());
512           datanode.getPendingUncached().remove(cblock);
513           iter.remove();
514         }
515       }
516       BlockInfoContiguous blockInfo = blockManager.
517             getStoredBlock(new Block(cblock.getBlockId()));
518       String reason = findReasonForNotCaching(cblock, blockInfo);
519       int neededCached = 0;
520       if (reason != null) {
521         LOG.trace("Block {}: can't cache block because it is {}",
522             cblock.getBlockId(), reason);
523       } else {
524         neededCached = cblock.getReplication();
525       }
526       int numCached = cached.size();
527       if (numCached >= neededCached) {
528         // If we have enough replicas, drop all pending cached.
529         for (Iterator<DatanodeDescriptor> iter = pendingCached.iterator();
530             iter.hasNext(); ) {
531           DatanodeDescriptor datanode = iter.next();
532           datanode.getPendingCached().remove(cblock);
533           iter.remove();
534           LOG.trace("Block {}: removing from PENDING_CACHED for node {}"
535                   + "because we already have {} cached replicas and we only" +
536                   " need {}",
537               cblock.getBlockId(), datanode.getDatanodeUuid(), numCached,
538               neededCached
539           );
540         }
541       }
542       if (numCached < neededCached) {
543         // If we don't have enough replicas, drop all pending uncached.
544         for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
545             iter.hasNext(); ) {
546           DatanodeDescriptor datanode = iter.next();
547           datanode.getPendingUncached().remove(cblock);
548           iter.remove();
549           LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
550                   + "because we only have {} cached replicas and we need " +
551                   "{}", cblock.getBlockId(), datanode.getDatanodeUuid(),
552               numCached, neededCached
553           );
554         }
555       }
556       int neededUncached = numCached -
557           (pendingUncached.size() + neededCached);
558       if (neededUncached > 0) {
559         addNewPendingUncached(neededUncached, cblock, cached,
560             pendingUncached);
561       } else {
562         int additionalCachedNeeded = neededCached -
563             (numCached + pendingCached.size());
564         if (additionalCachedNeeded > 0) {
565           addNewPendingCached(additionalCachedNeeded, cblock, cached,
566               pendingCached);
567         }
568       }
569       if ((neededCached == 0) &&
570           pendingUncached.isEmpty() &&
571           pendingCached.isEmpty()) {
572         // we have nothing more to do with this block.
573         LOG.trace("Block {}: removing from cachedBlocks, since neededCached "
574                 + "== 0, and pendingUncached and pendingCached are empty.",
575             cblock.getBlockId()
576         );
577         cbIter.remove();
578       }
579     }
580   }
581 
582   /**
583    * Add new entries to the PendingUncached list.
584    *
585    * @param neededUncached   The number of replicas that need to be uncached.
586    * @param cachedBlock      The block which needs to be uncached.
587    * @param cached           A list of DataNodes currently caching the block.
588    * @param pendingUncached  A list of DataNodes that will soon uncache the
589    *                         block.
590    */
addNewPendingUncached(int neededUncached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingUncached)591   private void addNewPendingUncached(int neededUncached,
592       CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
593       List<DatanodeDescriptor> pendingUncached) {
594     // Figure out which replicas can be uncached.
595     LinkedList<DatanodeDescriptor> possibilities =
596         new LinkedList<DatanodeDescriptor>();
597     for (DatanodeDescriptor datanode : cached) {
598       if (!pendingUncached.contains(datanode)) {
599         possibilities.add(datanode);
600       }
601     }
602     while (neededUncached > 0) {
603       if (possibilities.isEmpty()) {
604         LOG.warn("Logic error: we're trying to uncache more replicas than " +
605             "actually exist for " + cachedBlock);
606         return;
607       }
608       DatanodeDescriptor datanode =
609         possibilities.remove(random.nextInt(possibilities.size()));
610       pendingUncached.add(datanode);
611       boolean added = datanode.getPendingUncached().add(cachedBlock);
612       assert added;
613       neededUncached--;
614     }
615   }
616 
617   /**
618    * Add new entries to the PendingCached list.
619    *
620    * @param neededCached     The number of replicas that need to be cached.
621    * @param cachedBlock      The block which needs to be cached.
622    * @param cached           A list of DataNodes currently caching the block.
623    * @param pendingCached    A list of DataNodes that will soon cache the
624    *                         block.
625    */
addNewPendingCached(final int neededCached, CachedBlock cachedBlock, List<DatanodeDescriptor> cached, List<DatanodeDescriptor> pendingCached)626   private void addNewPendingCached(final int neededCached,
627       CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
628       List<DatanodeDescriptor> pendingCached) {
629     // To figure out which replicas can be cached, we consult the
630     // blocksMap.  We don't want to try to cache a corrupt replica, though.
631     BlockInfoContiguous blockInfo = blockManager.
632           getStoredBlock(new Block(cachedBlock.getBlockId()));
633     if (blockInfo == null) {
634       LOG.debug("Block {}: can't add new cached replicas," +
635           " because there is no record of this block " +
636           "on the NameNode.", cachedBlock.getBlockId());
637       return;
638     }
639     if (!blockInfo.isComplete()) {
640       LOG.debug("Block {}: can't cache this block, because it is not yet"
641           + " complete.", cachedBlock.getBlockId());
642       return;
643     }
644     // Filter the list of replicas to only the valid targets
645     List<DatanodeDescriptor> possibilities =
646         new LinkedList<DatanodeDescriptor>();
647     int numReplicas = blockInfo.getCapacity();
648     Collection<DatanodeDescriptor> corrupt =
649         blockManager.getCorruptReplicas(blockInfo);
650     int outOfCapacity = 0;
651     for (int i = 0; i < numReplicas; i++) {
652       DatanodeDescriptor datanode = blockInfo.getDatanode(i);
653       if (datanode == null) {
654         continue;
655       }
656       if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
657         continue;
658       }
659       if (corrupt != null && corrupt.contains(datanode)) {
660         continue;
661       }
662       if (pendingCached.contains(datanode) || cached.contains(datanode)) {
663         continue;
664       }
665       long pendingBytes = 0;
666       // Subtract pending cached blocks from effective capacity
667       Iterator<CachedBlock> it = datanode.getPendingCached().iterator();
668       while (it.hasNext()) {
669         CachedBlock cBlock = it.next();
670         BlockInfoContiguous info =
671             blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
672         if (info != null) {
673           pendingBytes -= info.getNumBytes();
674         }
675       }
676       it = datanode.getPendingUncached().iterator();
677       // Add pending uncached blocks from effective capacity
678       while (it.hasNext()) {
679         CachedBlock cBlock = it.next();
680         BlockInfoContiguous info =
681             blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
682         if (info != null) {
683           pendingBytes += info.getNumBytes();
684         }
685       }
686       long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
687       if (pendingCapacity < blockInfo.getNumBytes()) {
688         LOG.trace("Block {}: DataNode {} is not a valid possibility " +
689             "because the block has size {}, but the DataNode only has {}" +
690             "bytes of cache remaining ({} pending bytes, {} already cached.",
691             blockInfo.getBlockId(), datanode.getDatanodeUuid(),
692             blockInfo.getNumBytes(), pendingCapacity, pendingBytes,
693             datanode.getCacheRemaining());
694         outOfCapacity++;
695         continue;
696       }
697       possibilities.add(datanode);
698     }
699     List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities,
700         neededCached, blockManager.getDatanodeManager().getStaleInterval());
701     for (DatanodeDescriptor datanode : chosen) {
702       LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}",
703           blockInfo.getBlockId(), datanode.getDatanodeUuid());
704       pendingCached.add(datanode);
705       boolean added = datanode.getPendingCached().add(cachedBlock);
706       assert added;
707     }
708     // We were unable to satisfy the requested replication factor
709     if (neededCached > chosen.size()) {
710       LOG.debug("Block {}: we only have {} of {} cached replicas."
711               + " {} DataNodes have insufficient cache capacity.",
712           blockInfo.getBlockId(),
713           (cachedBlock.getReplication() - neededCached + chosen.size()),
714           cachedBlock.getReplication(), outOfCapacity
715       );
716     }
717   }
718 
719   /**
720    * Chooses datanode locations for caching from a list of valid possibilities.
721    * Non-stale nodes are chosen before stale nodes.
722    *
723    * @param possibilities List of candidate datanodes
724    * @param neededCached Number of replicas needed
725    * @param staleInterval Age of a stale datanode
726    * @return A list of chosen datanodes
727    */
chooseDatanodesForCaching( final List<DatanodeDescriptor> possibilities, final int neededCached, final long staleInterval)728   private static List<DatanodeDescriptor> chooseDatanodesForCaching(
729       final List<DatanodeDescriptor> possibilities, final int neededCached,
730       final long staleInterval) {
731     // Make a copy that we can modify
732     List<DatanodeDescriptor> targets =
733         new ArrayList<DatanodeDescriptor>(possibilities);
734     // Selected targets
735     List<DatanodeDescriptor> chosen = new LinkedList<DatanodeDescriptor>();
736 
737     // Filter out stale datanodes
738     List<DatanodeDescriptor> stale = new LinkedList<DatanodeDescriptor>();
739     Iterator<DatanodeDescriptor> it = targets.iterator();
740     while (it.hasNext()) {
741       DatanodeDescriptor d = it.next();
742       if (d.isStale(staleInterval)) {
743         it.remove();
744         stale.add(d);
745       }
746     }
747     // Select targets
748     while (chosen.size() < neededCached) {
749       // Try to use stale nodes if we're out of non-stale nodes, else we're done
750       if (targets.isEmpty()) {
751         if (!stale.isEmpty()) {
752           targets = stale;
753         } else {
754           break;
755         }
756       }
757       // Select a random target
758       DatanodeDescriptor target =
759           chooseRandomDatanodeByRemainingCapacity(targets);
760       chosen.add(target);
761       targets.remove(target);
762     }
763     return chosen;
764   }
765 
766   /**
767    * Choose a single datanode from the provided list of possible
768    * targets, weighted by the percentage of free space remaining on the node.
769    *
770    * @return The chosen datanode
771    */
chooseRandomDatanodeByRemainingCapacity( final List<DatanodeDescriptor> targets)772   private static DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity(
773       final List<DatanodeDescriptor> targets) {
774     // Use a weighted probability to choose the target datanode
775     float total = 0;
776     for (DatanodeDescriptor d : targets) {
777       total += d.getCacheRemainingPercent();
778     }
779     // Give each datanode a portion of keyspace equal to its relative weight
780     // [0, w1) selects d1, [w1, w2) selects d2, etc.
781     TreeMap<Integer, DatanodeDescriptor> lottery =
782         new TreeMap<Integer, DatanodeDescriptor>();
783     int offset = 0;
784     for (DatanodeDescriptor d : targets) {
785       // Since we're using floats, be paranoid about negative values
786       int weight =
787           Math.max(1, (int)((d.getCacheRemainingPercent() / total) * 1000000));
788       offset += weight;
789       lottery.put(offset, d);
790     }
791     // Choose a number from [0, offset), which is the total amount of weight,
792     // to select the winner
793     DatanodeDescriptor winner =
794         lottery.higherEntry(random.nextInt(offset)).getValue();
795     return winner;
796   }
797 }
798