1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.namenode;
19 
20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT;
21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT;
22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
24 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
25 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
26 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
27 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
28 
29 import java.io.DataInput;
30 import java.io.DataOutputStream;
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.Date;
36 import java.util.EnumSet;
37 import java.util.Iterator;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map.Entry;
41 import java.util.SortedMap;
42 import java.util.TreeMap;
43 import java.util.concurrent.locks.ReentrantLock;
44 
45 import org.apache.commons.io.IOUtils;
46 import org.apache.hadoop.classification.InterfaceAudience;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
49 import org.apache.hadoop.fs.CacheFlag;
50 import org.apache.hadoop.fs.InvalidRequestException;
51 import org.apache.hadoop.fs.Path;
52 import org.apache.hadoop.fs.UnresolvedLinkException;
53 import org.apache.hadoop.fs.permission.FsAction;
54 import org.apache.hadoop.fs.permission.FsPermission;
55 import org.apache.hadoop.hdfs.DFSUtil;
56 import org.apache.hadoop.hdfs.protocol.CacheDirective;
57 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
58 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
59 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
60 import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
61 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
62 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
63 import org.apache.hadoop.hdfs.protocol.DatanodeID;
64 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
65 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
66 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
67 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
68 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
69 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
70 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
71 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
72 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
73 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
74 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
75 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
76 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
77 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
78 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
79 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
80 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
81 import org.apache.hadoop.hdfs.util.ReadOnlyList;
82 import org.apache.hadoop.security.AccessControlException;
83 import org.apache.hadoop.util.GSet;
84 import org.apache.hadoop.util.LightWeightGSet;
85 import org.apache.hadoop.util.Time;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88 
89 import com.google.common.annotations.VisibleForTesting;
90 import com.google.common.collect.Lists;
91 
92 /**
93  * The Cache Manager handles caching on DataNodes.
94  *
95  * This class is instantiated by the FSNamesystem.
96  * It maintains the mapping of cached blocks to datanodes via processing
97  * datanode cache reports. Based on these reports and addition and removal of
98  * caching directives, we will schedule caching and uncaching work.
99  */
100 @InterfaceAudience.LimitedPrivate({"HDFS"})
101 public final class CacheManager {
102   public static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
103 
104   private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f;
105 
106   // TODO: add pending / underCached / schedule cached blocks stats.
107 
108   /**
109    * The FSNamesystem that contains this CacheManager.
110    */
111   private final FSNamesystem namesystem;
112 
113   /**
114    * The BlockManager associated with the FSN that owns this CacheManager.
115    */
116   private final BlockManager blockManager;
117 
118   /**
119    * Cache directives, sorted by ID.
120    *
121    * listCacheDirectives relies on the ordering of elements in this map
122    * to track what has already been listed by the client.
123    */
124   private final TreeMap<Long, CacheDirective> directivesById =
125       new TreeMap<Long, CacheDirective>();
126 
127   /**
128    * The directive ID to use for a new directive.  IDs always increase, and are
129    * never reused.
130    */
131   private long nextDirectiveId;
132 
133   /**
134    * Cache directives, sorted by path
135    */
136   private final TreeMap<String, List<CacheDirective>> directivesByPath =
137       new TreeMap<String, List<CacheDirective>>();
138 
139   /**
140    * Cache pools, sorted by name.
141    */
142   private final TreeMap<String, CachePool> cachePools =
143       new TreeMap<String, CachePool>();
144 
145   /**
146    * Maximum number of cache pools to list in one operation.
147    */
148   private final int maxListCachePoolsResponses;
149 
150   /**
151    * Maximum number of cache pool directives to list in one operation.
152    */
153   private final int maxListCacheDirectivesNumResponses;
154 
155   /**
156    * Interval between scans in milliseconds.
157    */
158   private final long scanIntervalMs;
159 
160   /**
161    * All cached blocks.
162    */
163   private final GSet<CachedBlock, CachedBlock> cachedBlocks;
164 
165   /**
166    * Lock which protects the CacheReplicationMonitor.
167    */
168   private final ReentrantLock crmLock = new ReentrantLock();
169 
170   private final SerializerCompat serializerCompat = new SerializerCompat();
171 
172   /**
173    * The CacheReplicationMonitor.
174    */
175   private CacheReplicationMonitor monitor;
176 
177   public static final class PersistState {
178     public final CacheManagerSection section;
179     public final List<CachePoolInfoProto> pools;
180     public final List<CacheDirectiveInfoProto> directives;
181 
PersistState(CacheManagerSection section, List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives)182     public PersistState(CacheManagerSection section,
183         List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives) {
184       this.section = section;
185       this.pools = pools;
186       this.directives = directives;
187     }
188   }
189 
CacheManager(FSNamesystem namesystem, Configuration conf, BlockManager blockManager)190   CacheManager(FSNamesystem namesystem, Configuration conf,
191       BlockManager blockManager) {
192     this.namesystem = namesystem;
193     this.blockManager = blockManager;
194     this.nextDirectiveId = 1;
195     this.maxListCachePoolsResponses = conf.getInt(
196         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
197         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
198     this.maxListCacheDirectivesNumResponses = conf.getInt(
199         DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
200         DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT);
201     scanIntervalMs = conf.getLong(
202         DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
203         DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
204     float cachedBlocksPercent = conf.getFloat(
205           DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT,
206           DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT);
207     if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) {
208       LOG.info("Using minimum value {} for {}", MIN_CACHED_BLOCKS_PERCENT,
209         DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT);
210       cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT;
211     }
212     this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>(
213           LightWeightGSet.computeCapacity(cachedBlocksPercent,
214               "cachedBlocks"));
215 
216   }
217 
218   /**
219    * Resets all tracked directives and pools. Called during 2NN checkpointing to
220    * reset FSNamesystem state. See {@link FSNamesystem#clear()}.
221    */
clear()222   void clear() {
223     directivesById.clear();
224     directivesByPath.clear();
225     cachePools.clear();
226     nextDirectiveId = 1;
227   }
228 
startMonitorThread()229   public void startMonitorThread() {
230     crmLock.lock();
231     try {
232       if (this.monitor == null) {
233         this.monitor = new CacheReplicationMonitor(namesystem, this,
234             scanIntervalMs, crmLock);
235         this.monitor.start();
236       }
237     } finally {
238       crmLock.unlock();
239     }
240   }
241 
stopMonitorThread()242   public void stopMonitorThread() {
243     crmLock.lock();
244     try {
245       if (this.monitor != null) {
246         CacheReplicationMonitor prevMonitor = this.monitor;
247         this.monitor = null;
248         IOUtils.closeQuietly(prevMonitor);
249       }
250     } finally {
251       crmLock.unlock();
252     }
253   }
254 
clearDirectiveStats()255   public void clearDirectiveStats() {
256     assert namesystem.hasWriteLock();
257     for (CacheDirective directive : directivesById.values()) {
258       directive.resetStatistics();
259     }
260   }
261 
262   /**
263    * @return Unmodifiable view of the collection of CachePools.
264    */
getCachePools()265   public Collection<CachePool> getCachePools() {
266     assert namesystem.hasReadLock();
267     return Collections.unmodifiableCollection(cachePools.values());
268   }
269 
270   /**
271    * @return Unmodifiable view of the collection of CacheDirectives.
272    */
getCacheDirectives()273   public Collection<CacheDirective> getCacheDirectives() {
274     assert namesystem.hasReadLock();
275     return Collections.unmodifiableCollection(directivesById.values());
276   }
277 
278   @VisibleForTesting
getCachedBlocks()279   public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
280     assert namesystem.hasReadLock();
281     return cachedBlocks;
282   }
283 
getNextDirectiveId()284   private long getNextDirectiveId() throws IOException {
285     assert namesystem.hasWriteLock();
286     if (nextDirectiveId >= Long.MAX_VALUE - 1) {
287       throw new IOException("No more available IDs.");
288     }
289     return nextDirectiveId++;
290   }
291 
292   // Helper getter / validation methods
293 
checkWritePermission(FSPermissionChecker pc, CachePool pool)294   private static void checkWritePermission(FSPermissionChecker pc,
295       CachePool pool) throws AccessControlException {
296     if ((pc != null)) {
297       pc.checkPermission(pool, FsAction.WRITE);
298     }
299   }
300 
validatePoolName(CacheDirectiveInfo directive)301   private static String validatePoolName(CacheDirectiveInfo directive)
302       throws InvalidRequestException {
303     String pool = directive.getPool();
304     if (pool == null) {
305       throw new InvalidRequestException("No pool specified.");
306     }
307     if (pool.isEmpty()) {
308       throw new InvalidRequestException("Invalid empty pool name.");
309     }
310     return pool;
311   }
312 
validatePath(CacheDirectiveInfo directive)313   private static String validatePath(CacheDirectiveInfo directive)
314       throws InvalidRequestException {
315     if (directive.getPath() == null) {
316       throw new InvalidRequestException("No path specified.");
317     }
318     String path = directive.getPath().toUri().getPath();
319     if (!DFSUtil.isValidName(path)) {
320       throw new InvalidRequestException("Invalid path '" + path + "'.");
321     }
322     return path;
323   }
324 
validateReplication(CacheDirectiveInfo directive, short defaultValue)325   private static short validateReplication(CacheDirectiveInfo directive,
326       short defaultValue) throws InvalidRequestException {
327     short repl = (directive.getReplication() != null)
328         ? directive.getReplication() : defaultValue;
329     if (repl <= 0) {
330       throw new InvalidRequestException("Invalid replication factor " + repl
331           + " <= 0");
332     }
333     return repl;
334   }
335 
336   /**
337    * Calculates the absolute expiry time of the directive from the
338    * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration
339    * into an absolute time based on the local clock.
340    *
341    * @param info to validate.
342    * @param maxRelativeExpiryTime of the info's pool.
343    * @return the expiration time, or the pool's max absolute expiration if the
344    *         info's expiration was not set.
345    * @throws InvalidRequestException if the info's Expiration is invalid.
346    */
validateExpiryTime(CacheDirectiveInfo info, long maxRelativeExpiryTime)347   private static long validateExpiryTime(CacheDirectiveInfo info,
348       long maxRelativeExpiryTime) throws InvalidRequestException {
349     LOG.trace("Validating directive {} pool maxRelativeExpiryTime {}", info,
350         maxRelativeExpiryTime);
351     final long now = new Date().getTime();
352     final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime;
353     if (info == null || info.getExpiration() == null) {
354       return maxAbsoluteExpiryTime;
355     }
356     Expiration expiry = info.getExpiration();
357     if (expiry.getMillis() < 0l) {
358       throw new InvalidRequestException("Cannot set a negative expiration: "
359           + expiry.getMillis());
360     }
361     long relExpiryTime, absExpiryTime;
362     if (expiry.isRelative()) {
363       relExpiryTime = expiry.getMillis();
364       absExpiryTime = now + relExpiryTime;
365     } else {
366       absExpiryTime = expiry.getMillis();
367       relExpiryTime = absExpiryTime - now;
368     }
369     // Need to cap the expiry so we don't overflow a long when doing math
370     if (relExpiryTime > Expiration.MAX_RELATIVE_EXPIRY_MS) {
371       throw new InvalidRequestException("Expiration "
372           + expiry.toString() + " is too far in the future!");
373     }
374     // Fail if the requested expiry is greater than the max
375     if (relExpiryTime > maxRelativeExpiryTime) {
376       throw new InvalidRequestException("Expiration " + expiry.toString()
377           + " exceeds the max relative expiration time of "
378           + maxRelativeExpiryTime + " ms.");
379     }
380     return absExpiryTime;
381   }
382 
383   /**
384    * Throws an exception if the CachePool does not have enough capacity to
385    * cache the given path at the replication factor.
386    *
387    * @param pool CachePool where the path is being cached
388    * @param path Path that is being cached
389    * @param replication Replication factor of the path
390    * @throws InvalidRequestException if the pool does not have enough capacity
391    */
checkLimit(CachePool pool, String path, short replication)392   private void checkLimit(CachePool pool, String path,
393       short replication) throws InvalidRequestException {
394     CacheDirectiveStats stats = computeNeeded(path, replication);
395     if (pool.getLimit() == CachePoolInfo.LIMIT_UNLIMITED) {
396       return;
397     }
398     if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool
399         .getLimit()) {
400       throw new InvalidRequestException("Caching path " + path + " of size "
401           + stats.getBytesNeeded() / replication + " bytes at replication "
402           + replication + " would exceed pool " + pool.getPoolName()
403           + "'s remaining capacity of "
404           + (pool.getLimit() - pool.getBytesNeeded()) + " bytes.");
405     }
406   }
407 
408   /**
409    * Computes the needed number of bytes and files for a path.
410    * @return CacheDirectiveStats describing the needed stats for this path
411    */
computeNeeded(String path, short replication)412   private CacheDirectiveStats computeNeeded(String path, short replication) {
413     FSDirectory fsDir = namesystem.getFSDirectory();
414     INode node;
415     long requestedBytes = 0;
416     long requestedFiles = 0;
417     CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
418     try {
419       node = fsDir.getINode(path);
420     } catch (UnresolvedLinkException e) {
421       // We don't cache through symlinks
422       return builder.build();
423     }
424     if (node == null) {
425       return builder.build();
426     }
427     if (node.isFile()) {
428       requestedFiles = 1;
429       INodeFile file = node.asFile();
430       requestedBytes = file.computeFileSize();
431     } else if (node.isDirectory()) {
432       INodeDirectory dir = node.asDirectory();
433       ReadOnlyList<INode> children = dir
434           .getChildrenList(Snapshot.CURRENT_STATE_ID);
435       requestedFiles = children.size();
436       for (INode child : children) {
437         if (child.isFile()) {
438           requestedBytes += child.asFile().computeFileSize();
439         }
440       }
441     }
442     return new CacheDirectiveStats.Builder()
443         .setBytesNeeded(requestedBytes)
444         .setFilesCached(requestedFiles)
445         .build();
446   }
447 
448   /**
449    * Get a CacheDirective by ID, validating the ID and that the directive
450    * exists.
451    */
getById(long id)452   private CacheDirective getById(long id) throws InvalidRequestException {
453     // Check for invalid IDs.
454     if (id <= 0) {
455       throw new InvalidRequestException("Invalid negative ID.");
456     }
457     // Find the directive.
458     CacheDirective directive = directivesById.get(id);
459     if (directive == null) {
460       throw new InvalidRequestException("No directive with ID " + id
461           + " found.");
462     }
463     return directive;
464   }
465 
466   /**
467    * Get a CachePool by name, validating that it exists.
468    */
getCachePool(String poolName)469   private CachePool getCachePool(String poolName)
470       throws InvalidRequestException {
471     CachePool pool = cachePools.get(poolName);
472     if (pool == null) {
473       throw new InvalidRequestException("Unknown pool " + poolName);
474     }
475     return pool;
476   }
477 
478   // RPC handlers
479 
addInternal(CacheDirective directive, CachePool pool)480   private void addInternal(CacheDirective directive, CachePool pool) {
481     boolean addedDirective = pool.getDirectiveList().add(directive);
482     assert addedDirective;
483     directivesById.put(directive.getId(), directive);
484     String path = directive.getPath();
485     List<CacheDirective> directives = directivesByPath.get(path);
486     if (directives == null) {
487       directives = new ArrayList<CacheDirective>(1);
488       directivesByPath.put(path, directives);
489     }
490     directives.add(directive);
491     // Fix up pool stats
492     CacheDirectiveStats stats =
493         computeNeeded(directive.getPath(), directive.getReplication());
494     directive.addBytesNeeded(stats.getBytesNeeded());
495     directive.addFilesNeeded(directive.getFilesNeeded());
496 
497     setNeedsRescan();
498   }
499 
500   /**
501    * Adds a directive, skipping most error checking. This should only be called
502    * internally in special scenarios like edit log replay.
503    */
addDirectiveFromEditLog(CacheDirectiveInfo directive)504   CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
505       throws InvalidRequestException {
506     long id = directive.getId();
507     CacheDirective entry = new CacheDirective(directive);
508     CachePool pool = cachePools.get(directive.getPool());
509     addInternal(entry, pool);
510     if (nextDirectiveId <= id) {
511       nextDirectiveId = id + 1;
512     }
513     return entry.toInfo();
514   }
515 
addDirective( CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)516   public CacheDirectiveInfo addDirective(
517       CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
518       throws IOException {
519     assert namesystem.hasWriteLock();
520     CacheDirective directive;
521     try {
522       CachePool pool = getCachePool(validatePoolName(info));
523       checkWritePermission(pc, pool);
524       String path = validatePath(info);
525       short replication = validateReplication(info, (short)1);
526       long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs());
527       // Do quota validation if required
528       if (!flags.contains(CacheFlag.FORCE)) {
529         checkLimit(pool, path, replication);
530       }
531       // All validation passed
532       // Add a new entry with the next available ID.
533       long id = getNextDirectiveId();
534       directive = new CacheDirective(id, path, replication, expiryTime);
535       addInternal(directive, pool);
536     } catch (IOException e) {
537       LOG.warn("addDirective of " + info + " failed: ", e);
538       throw e;
539     }
540     LOG.info("addDirective of {} successful.", info);
541     return directive.toInfo();
542   }
543 
544   /**
545    * Factory method that makes a new CacheDirectiveInfo by applying fields in a
546    * CacheDirectiveInfo to an existing CacheDirective.
547    *
548    * @param info with some or all fields set.
549    * @param defaults directive providing default values for unset fields in
550    *          info.
551    *
552    * @return new CacheDirectiveInfo of the info applied to the defaults.
553    */
createFromInfoAndDefaults( CacheDirectiveInfo info, CacheDirective defaults)554   private static CacheDirectiveInfo createFromInfoAndDefaults(
555       CacheDirectiveInfo info, CacheDirective defaults) {
556     // Initialize the builder with the default values
557     CacheDirectiveInfo.Builder builder =
558         new CacheDirectiveInfo.Builder(defaults.toInfo());
559     // Replace default with new value if present
560     if (info.getPath() != null) {
561       builder.setPath(info.getPath());
562     }
563     if (info.getReplication() != null) {
564       builder.setReplication(info.getReplication());
565     }
566     if (info.getPool() != null) {
567       builder.setPool(info.getPool());
568     }
569     if (info.getExpiration() != null) {
570       builder.setExpiration(info.getExpiration());
571     }
572     return builder.build();
573   }
574 
575   /**
576    * Modifies a directive, skipping most error checking. This is for careful
577    * internal use only. modifyDirective can be non-deterministic since its error
578    * checking depends on current system time, which poses a problem for edit log
579    * replay.
580    */
modifyDirectiveFromEditLog(CacheDirectiveInfo info)581   void modifyDirectiveFromEditLog(CacheDirectiveInfo info)
582       throws InvalidRequestException {
583     // Check for invalid IDs.
584     Long id = info.getId();
585     if (id == null) {
586       throw new InvalidRequestException("Must supply an ID.");
587     }
588     CacheDirective prevEntry = getById(id);
589     CacheDirectiveInfo newInfo = createFromInfoAndDefaults(info, prevEntry);
590     removeInternal(prevEntry);
591     addInternal(new CacheDirective(newInfo), getCachePool(newInfo.getPool()));
592   }
593 
modifyDirective(CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)594   public void modifyDirective(CacheDirectiveInfo info,
595       FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
596     assert namesystem.hasWriteLock();
597     String idString =
598         (info.getId() == null) ?
599             "(null)" : info.getId().toString();
600     try {
601       // Check for invalid IDs.
602       Long id = info.getId();
603       if (id == null) {
604         throw new InvalidRequestException("Must supply an ID.");
605       }
606       CacheDirective prevEntry = getById(id);
607       checkWritePermission(pc, prevEntry.getPool());
608 
609       // Fill in defaults
610       CacheDirectiveInfo infoWithDefaults =
611           createFromInfoAndDefaults(info, prevEntry);
612       CacheDirectiveInfo.Builder builder =
613           new CacheDirectiveInfo.Builder(infoWithDefaults);
614 
615       // Do validation
616       validatePath(infoWithDefaults);
617       validateReplication(infoWithDefaults, (short)-1);
618       // Need to test the pool being set here to avoid rejecting a modify for a
619       // directive that's already been forced into a pool
620       CachePool srcPool = prevEntry.getPool();
621       CachePool destPool = getCachePool(validatePoolName(infoWithDefaults));
622       if (!srcPool.getPoolName().equals(destPool.getPoolName())) {
623         checkWritePermission(pc, destPool);
624         if (!flags.contains(CacheFlag.FORCE)) {
625           checkLimit(destPool, infoWithDefaults.getPath().toUri().getPath(),
626               infoWithDefaults.getReplication());
627         }
628       }
629       // Verify the expiration against the destination pool
630       validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs());
631 
632       // Indicate changes to the CRM
633       setNeedsRescan();
634 
635       // Validation passed
636       removeInternal(prevEntry);
637       addInternal(new CacheDirective(builder.build()), destPool);
638     } catch (IOException e) {
639       LOG.warn("modifyDirective of " + idString + " failed: ", e);
640       throw e;
641     }
642     LOG.info("modifyDirective of {} successfully applied {}.", idString, info);
643   }
644 
removeInternal(CacheDirective directive)645   private void removeInternal(CacheDirective directive)
646       throws InvalidRequestException {
647     assert namesystem.hasWriteLock();
648     // Remove the corresponding entry in directivesByPath.
649     String path = directive.getPath();
650     List<CacheDirective> directives = directivesByPath.get(path);
651     if (directives == null || !directives.remove(directive)) {
652       throw new InvalidRequestException("Failed to locate entry " +
653           directive.getId() + " by path " + directive.getPath());
654     }
655     if (directives.size() == 0) {
656       directivesByPath.remove(path);
657     }
658     // Fix up the stats from removing the pool
659     final CachePool pool = directive.getPool();
660     directive.addBytesNeeded(-directive.getBytesNeeded());
661     directive.addFilesNeeded(-directive.getFilesNeeded());
662 
663     directivesById.remove(directive.getId());
664     pool.getDirectiveList().remove(directive);
665     assert directive.getPool() == null;
666 
667     setNeedsRescan();
668   }
669 
removeDirective(long id, FSPermissionChecker pc)670   public void removeDirective(long id, FSPermissionChecker pc)
671       throws IOException {
672     assert namesystem.hasWriteLock();
673     try {
674       CacheDirective directive = getById(id);
675       checkWritePermission(pc, directive.getPool());
676       removeInternal(directive);
677     } catch (IOException e) {
678       LOG.warn("removeDirective of " + id + " failed: ", e);
679       throw e;
680     }
681     LOG.info("removeDirective of " + id + " successful.");
682   }
683 
684   public BatchedListEntries<CacheDirectiveEntry>
listCacheDirectives(long prevId, CacheDirectiveInfo filter, FSPermissionChecker pc)685         listCacheDirectives(long prevId,
686             CacheDirectiveInfo filter,
687             FSPermissionChecker pc) throws IOException {
688     assert namesystem.hasReadLock();
689     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
690     String filterPath = null;
691     if (filter.getPath() != null) {
692       filterPath = validatePath(filter);
693     }
694     if (filter.getReplication() != null) {
695       throw new InvalidRequestException(
696           "Filtering by replication is unsupported.");
697     }
698 
699     // Querying for a single ID
700     final Long id = filter.getId();
701     if (id != null) {
702       if (!directivesById.containsKey(id)) {
703         throw new InvalidRequestException("Did not find requested id " + id);
704       }
705       // Since we use a tailMap on directivesById, setting prev to id-1 gets
706       // us the directive with the id (if present)
707       prevId = id - 1;
708     }
709 
710     ArrayList<CacheDirectiveEntry> replies =
711         new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
712     int numReplies = 0;
713     SortedMap<Long, CacheDirective> tailMap =
714       directivesById.tailMap(prevId + 1);
715     for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) {
716       if (numReplies >= maxListCacheDirectivesNumResponses) {
717         return new BatchedListEntries<CacheDirectiveEntry>(replies, true);
718       }
719       CacheDirective curDirective = cur.getValue();
720       CacheDirectiveInfo info = cur.getValue().toInfo();
721 
722       // If the requested ID is present, it should be the first item.
723       // Hitting this case means the ID is not present, or we're on the second
724       // item and should break out.
725       if (id != null &&
726           !(info.getId().equals(id))) {
727         break;
728       }
729       if (filter.getPool() != null &&
730           !info.getPool().equals(filter.getPool())) {
731         continue;
732       }
733       if (filterPath != null &&
734           !info.getPath().toUri().getPath().equals(filterPath)) {
735         continue;
736       }
737       boolean hasPermission = true;
738       if (pc != null) {
739         try {
740           pc.checkPermission(curDirective.getPool(), FsAction.READ);
741         } catch (AccessControlException e) {
742           hasPermission = false;
743         }
744       }
745       if (hasPermission) {
746         replies.add(new CacheDirectiveEntry(info, cur.getValue().toStats()));
747         numReplies++;
748       }
749     }
750     return new BatchedListEntries<CacheDirectiveEntry>(replies, false);
751   }
752 
753   /**
754    * Create a cache pool.
755    *
756    * Only the superuser should be able to call this function.
757    *
758    * @param info    The info for the cache pool to create.
759    * @return        Information about the cache pool we created.
760    */
addCachePool(CachePoolInfo info)761   public CachePoolInfo addCachePool(CachePoolInfo info)
762       throws IOException {
763     assert namesystem.hasWriteLock();
764     CachePool pool;
765     try {
766       CachePoolInfo.validate(info);
767       String poolName = info.getPoolName();
768       pool = cachePools.get(poolName);
769       if (pool != null) {
770         throw new InvalidRequestException("Cache pool " + poolName
771             + " already exists.");
772       }
773       pool = CachePool.createFromInfoAndDefaults(info);
774       cachePools.put(pool.getPoolName(), pool);
775     } catch (IOException e) {
776       LOG.info("addCachePool of " + info + " failed: ", e);
777       throw e;
778     }
779     LOG.info("addCachePool of {} successful.", info);
780     return pool.getInfo(true);
781   }
782 
783   /**
784    * Modify a cache pool.
785    *
786    * Only the superuser should be able to call this function.
787    *
788    * @param info
789    *          The info for the cache pool to modify.
790    */
modifyCachePool(CachePoolInfo info)791   public void modifyCachePool(CachePoolInfo info)
792       throws IOException {
793     assert namesystem.hasWriteLock();
794     StringBuilder bld = new StringBuilder();
795     try {
796       CachePoolInfo.validate(info);
797       String poolName = info.getPoolName();
798       CachePool pool = cachePools.get(poolName);
799       if (pool == null) {
800         throw new InvalidRequestException("Cache pool " + poolName
801             + " does not exist.");
802       }
803       String prefix = "";
804       if (info.getOwnerName() != null) {
805         pool.setOwnerName(info.getOwnerName());
806         bld.append(prefix).
807           append("set owner to ").append(info.getOwnerName());
808         prefix = "; ";
809       }
810       if (info.getGroupName() != null) {
811         pool.setGroupName(info.getGroupName());
812         bld.append(prefix).
813           append("set group to ").append(info.getGroupName());
814         prefix = "; ";
815       }
816       if (info.getMode() != null) {
817         pool.setMode(info.getMode());
818         bld.append(prefix).append("set mode to " + info.getMode());
819         prefix = "; ";
820       }
821       if (info.getLimit() != null) {
822         pool.setLimit(info.getLimit());
823         bld.append(prefix).append("set limit to " + info.getLimit());
824         prefix = "; ";
825         // New limit changes stats, need to set needs refresh
826         setNeedsRescan();
827       }
828       if (info.getMaxRelativeExpiryMs() != null) {
829         final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
830         pool.setMaxRelativeExpiryMs(maxRelativeExpiry);
831         bld.append(prefix).append("set maxRelativeExpiry to "
832             + maxRelativeExpiry);
833         prefix = "; ";
834       }
835       if (prefix.isEmpty()) {
836         bld.append("no changes.");
837       }
838     } catch (IOException e) {
839       LOG.info("modifyCachePool of " + info + " failed: ", e);
840       throw e;
841     }
842     LOG.info("modifyCachePool of {} successful; {}", info.getPoolName(),
843         bld.toString());
844   }
845 
846   /**
847    * Remove a cache pool.
848    *
849    * Only the superuser should be able to call this function.
850    *
851    * @param poolName
852    *          The name for the cache pool to remove.
853    */
removeCachePool(String poolName)854   public void removeCachePool(String poolName)
855       throws IOException {
856     assert namesystem.hasWriteLock();
857     try {
858       CachePoolInfo.validateName(poolName);
859       CachePool pool = cachePools.remove(poolName);
860       if (pool == null) {
861         throw new InvalidRequestException(
862             "Cannot remove non-existent cache pool " + poolName);
863       }
864       // Remove all directives in this pool.
865       Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
866       while (iter.hasNext()) {
867         CacheDirective directive = iter.next();
868         directivesByPath.remove(directive.getPath());
869         directivesById.remove(directive.getId());
870         iter.remove();
871       }
872       setNeedsRescan();
873     } catch (IOException e) {
874       LOG.info("removeCachePool of " + poolName + " failed: ", e);
875       throw e;
876     }
877     LOG.info("removeCachePool of " + poolName + " successful.");
878   }
879 
880   public BatchedListEntries<CachePoolEntry>
listCachePools(FSPermissionChecker pc, String prevKey)881       listCachePools(FSPermissionChecker pc, String prevKey) {
882     assert namesystem.hasReadLock();
883     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
884     ArrayList<CachePoolEntry> results =
885         new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
886     SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
887     int numListed = 0;
888     for (Entry<String, CachePool> cur : tailMap.entrySet()) {
889       if (numListed++ >= maxListCachePoolsResponses) {
890         return new BatchedListEntries<CachePoolEntry>(results, true);
891       }
892       results.add(cur.getValue().getEntry(pc));
893     }
894     return new BatchedListEntries<CachePoolEntry>(results, false);
895   }
896 
setCachedLocations(LocatedBlock block)897   public void setCachedLocations(LocatedBlock block) {
898     CachedBlock cachedBlock =
899         new CachedBlock(block.getBlock().getBlockId(),
900             (short)0, false);
901     cachedBlock = cachedBlocks.get(cachedBlock);
902     if (cachedBlock == null) {
903       return;
904     }
905     List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED);
906     for (DatanodeDescriptor datanode : datanodes) {
907       block.addCachedLoc(datanode);
908     }
909   }
910 
processCacheReport(final DatanodeID datanodeID, final List<Long> blockIds)911   public final void processCacheReport(final DatanodeID datanodeID,
912       final List<Long> blockIds) throws IOException {
913     namesystem.writeLock();
914     final long startTime = Time.monotonicNow();
915     final long endTime;
916     try {
917       final DatanodeDescriptor datanode =
918           blockManager.getDatanodeManager().getDatanode(datanodeID);
919       if (datanode == null || !datanode.isAlive) {
920         throw new IOException(
921             "processCacheReport from dead or unregistered datanode: " +
922             datanode);
923       }
924       processCacheReportImpl(datanode, blockIds);
925     } finally {
926       endTime = Time.monotonicNow();
927       namesystem.writeUnlock();
928     }
929 
930     // Log the block report processing stats from Namenode perspective
931     final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
932     if (metrics != null) {
933       metrics.addCacheBlockReport((int) (endTime - startTime));
934     }
935     LOG.debug("Processed cache report from {}, blocks: {}, " +
936         "processing time: {} msecs", datanodeID, blockIds.size(),
937         (endTime - startTime));
938   }
939 
processCacheReportImpl(final DatanodeDescriptor datanode, final List<Long> blockIds)940   private void processCacheReportImpl(final DatanodeDescriptor datanode,
941       final List<Long> blockIds) {
942     CachedBlocksList cached = datanode.getCached();
943     cached.clear();
944     CachedBlocksList cachedList = datanode.getCached();
945     CachedBlocksList pendingCachedList = datanode.getPendingCached();
946     for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
947       long blockId = iter.next();
948       LOG.trace("Cache report from datanode {} has block {}", datanode,
949           blockId);
950       CachedBlock cachedBlock =
951           new CachedBlock(blockId, (short)0, false);
952       CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
953       // Add the block ID from the cache report to the cachedBlocks map
954       // if it's not already there.
955       if (prevCachedBlock != null) {
956         cachedBlock = prevCachedBlock;
957       } else {
958         cachedBlocks.put(cachedBlock);
959         LOG.trace("Added block {}  to cachedBlocks", cachedBlock);
960       }
961       // Add the block to the datanode's implicit cached block list
962       // if it's not already there.  Similarly, remove it from the pending
963       // cached block list if it exists there.
964       if (!cachedBlock.isPresent(cachedList)) {
965         cachedList.add(cachedBlock);
966         LOG.trace("Added block {} to CACHED list.", cachedBlock);
967       }
968       if (cachedBlock.isPresent(pendingCachedList)) {
969         pendingCachedList.remove(cachedBlock);
970         LOG.trace("Removed block {} from PENDING_CACHED list.", cachedBlock);
971       }
972     }
973   }
974 
975   /**
976    * Saves the current state of the CacheManager to the DataOutput. Used
977    * to persist CacheManager state in the FSImage.
978    * @param out DataOutput to persist state
979    * @param sdPath path of the storage directory
980    * @throws IOException
981    */
saveStateCompat(DataOutputStream out, String sdPath)982   public void saveStateCompat(DataOutputStream out, String sdPath)
983       throws IOException {
984     serializerCompat.save(out, sdPath);
985   }
986 
saveState()987   public PersistState saveState() throws IOException {
988     ArrayList<CachePoolInfoProto> pools = Lists
989         .newArrayListWithCapacity(cachePools.size());
990     ArrayList<CacheDirectiveInfoProto> directives = Lists
991         .newArrayListWithCapacity(directivesById.size());
992 
993     for (CachePool pool : cachePools.values()) {
994       CachePoolInfo p = pool.getInfo(true);
995       CachePoolInfoProto.Builder b = CachePoolInfoProto.newBuilder()
996           .setPoolName(p.getPoolName());
997 
998       if (p.getOwnerName() != null)
999         b.setOwnerName(p.getOwnerName());
1000 
1001       if (p.getGroupName() != null)
1002         b.setGroupName(p.getGroupName());
1003 
1004       if (p.getMode() != null)
1005         b.setMode(p.getMode().toShort());
1006 
1007       if (p.getLimit() != null)
1008         b.setLimit(p.getLimit());
1009 
1010       pools.add(b.build());
1011     }
1012 
1013     for (CacheDirective directive : directivesById.values()) {
1014       CacheDirectiveInfo info = directive.toInfo();
1015       CacheDirectiveInfoProto.Builder b = CacheDirectiveInfoProto.newBuilder()
1016           .setId(info.getId());
1017 
1018       if (info.getPath() != null) {
1019         b.setPath(info.getPath().toUri().getPath());
1020       }
1021 
1022       if (info.getReplication() != null) {
1023         b.setReplication(info.getReplication());
1024       }
1025 
1026       if (info.getPool() != null) {
1027         b.setPool(info.getPool());
1028       }
1029 
1030       Expiration expiry = info.getExpiration();
1031       if (expiry != null) {
1032         assert (!expiry.isRelative());
1033         b.setExpiration(PBHelper.convert(expiry));
1034       }
1035 
1036       directives.add(b.build());
1037     }
1038     CacheManagerSection s = CacheManagerSection.newBuilder()
1039         .setNextDirectiveId(nextDirectiveId).setNumPools(pools.size())
1040         .setNumDirectives(directives.size()).build();
1041 
1042     return new PersistState(s, pools, directives);
1043   }
1044 
1045   /**
1046    * Reloads CacheManager state from the passed DataInput. Used during namenode
1047    * startup to restore CacheManager state from an FSImage.
1048    * @param in DataInput from which to restore state
1049    * @throws IOException
1050    */
loadStateCompat(DataInput in)1051   public void loadStateCompat(DataInput in) throws IOException {
1052     serializerCompat.load(in);
1053   }
1054 
loadState(PersistState s)1055   public void loadState(PersistState s) throws IOException {
1056     nextDirectiveId = s.section.getNextDirectiveId();
1057     for (CachePoolInfoProto p : s.pools) {
1058       CachePoolInfo info = new CachePoolInfo(p.getPoolName());
1059       if (p.hasOwnerName())
1060         info.setOwnerName(p.getOwnerName());
1061 
1062       if (p.hasGroupName())
1063         info.setGroupName(p.getGroupName());
1064 
1065       if (p.hasMode())
1066         info.setMode(new FsPermission((short) p.getMode()));
1067 
1068       if (p.hasLimit())
1069         info.setLimit(p.getLimit());
1070 
1071       addCachePool(info);
1072     }
1073 
1074     for (CacheDirectiveInfoProto p : s.directives) {
1075       // Get pool reference by looking it up in the map
1076       final String poolName = p.getPool();
1077       CacheDirective directive = new CacheDirective(p.getId(), new Path(
1078           p.getPath()).toUri().getPath(), (short) p.getReplication(), p
1079           .getExpiration().getMillis());
1080       addCacheDirective(poolName, directive);
1081     }
1082   }
1083 
addCacheDirective(final String poolName, final CacheDirective directive)1084   private void addCacheDirective(final String poolName,
1085       final CacheDirective directive) throws IOException {
1086     CachePool pool = cachePools.get(poolName);
1087     if (pool == null) {
1088       throw new IOException("Directive refers to pool " + poolName
1089           + ", which does not exist.");
1090     }
1091     boolean addedDirective = pool.getDirectiveList().add(directive);
1092     assert addedDirective;
1093     if (directivesById.put(directive.getId(), directive) != null) {
1094       throw new IOException("A directive with ID " + directive.getId()
1095           + " already exists");
1096     }
1097     List<CacheDirective> directives = directivesByPath.get(directive.getPath());
1098     if (directives == null) {
1099       directives = new LinkedList<CacheDirective>();
1100       directivesByPath.put(directive.getPath(), directives);
1101     }
1102     directives.add(directive);
1103   }
1104 
1105   private final class SerializerCompat {
save(DataOutputStream out, String sdPath)1106     private void save(DataOutputStream out, String sdPath) throws IOException {
1107       out.writeLong(nextDirectiveId);
1108       savePools(out, sdPath);
1109       saveDirectives(out, sdPath);
1110     }
1111 
load(DataInput in)1112     private void load(DataInput in) throws IOException {
1113       nextDirectiveId = in.readLong();
1114       // pools need to be loaded first since directives point to their parent pool
1115       loadPools(in);
1116       loadDirectives(in);
1117     }
1118 
1119     /**
1120      * Save cache pools to fsimage
1121      */
savePools(DataOutputStream out, String sdPath)1122     private void savePools(DataOutputStream out,
1123         String sdPath) throws IOException {
1124       StartupProgress prog = NameNode.getStartupProgress();
1125       Step step = new Step(StepType.CACHE_POOLS, sdPath);
1126       prog.beginStep(Phase.SAVING_CHECKPOINT, step);
1127       prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size());
1128       Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
1129       out.writeInt(cachePools.size());
1130       for (CachePool pool: cachePools.values()) {
1131         FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true));
1132         counter.increment();
1133       }
1134       prog.endStep(Phase.SAVING_CHECKPOINT, step);
1135     }
1136 
1137     /*
1138      * Save cache entries to fsimage
1139      */
saveDirectives(DataOutputStream out, String sdPath)1140     private void saveDirectives(DataOutputStream out, String sdPath)
1141         throws IOException {
1142       StartupProgress prog = NameNode.getStartupProgress();
1143       Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
1144       prog.beginStep(Phase.SAVING_CHECKPOINT, step);
1145       prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size());
1146       Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
1147       out.writeInt(directivesById.size());
1148       for (CacheDirective directive : directivesById.values()) {
1149         FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo());
1150         counter.increment();
1151       }
1152       prog.endStep(Phase.SAVING_CHECKPOINT, step);
1153     }
1154 
1155     /**
1156      * Load cache pools from fsimage
1157      */
loadPools(DataInput in)1158     private void loadPools(DataInput in)
1159         throws IOException {
1160       StartupProgress prog = NameNode.getStartupProgress();
1161       Step step = new Step(StepType.CACHE_POOLS);
1162       prog.beginStep(Phase.LOADING_FSIMAGE, step);
1163       int numberOfPools = in.readInt();
1164       prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
1165       Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
1166       for (int i = 0; i < numberOfPools; i++) {
1167         addCachePool(FSImageSerialization.readCachePoolInfo(in));
1168         counter.increment();
1169       }
1170       prog.endStep(Phase.LOADING_FSIMAGE, step);
1171     }
1172 
1173     /**
1174      * Load cache directives from the fsimage
1175      */
loadDirectives(DataInput in)1176     private void loadDirectives(DataInput in) throws IOException {
1177       StartupProgress prog = NameNode.getStartupProgress();
1178       Step step = new Step(StepType.CACHE_ENTRIES);
1179       prog.beginStep(Phase.LOADING_FSIMAGE, step);
1180       int numDirectives = in.readInt();
1181       prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
1182       Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
1183       for (int i = 0; i < numDirectives; i++) {
1184         CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
1185         // Get pool reference by looking it up in the map
1186         final String poolName = info.getPool();
1187         CacheDirective directive =
1188             new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
1189                 info.getReplication(), info.getExpiration().getAbsoluteMillis());
1190         addCacheDirective(poolName, directive);
1191         counter.increment();
1192       }
1193       prog.endStep(Phase.LOADING_FSIMAGE, step);
1194     }
1195   }
1196 
waitForRescanIfNeeded()1197   public void waitForRescanIfNeeded() {
1198     crmLock.lock();
1199     try {
1200       if (monitor != null) {
1201         monitor.waitForRescanIfNeeded();
1202       }
1203     } finally {
1204       crmLock.unlock();
1205     }
1206   }
1207 
setNeedsRescan()1208   private void setNeedsRescan() {
1209     crmLock.lock();
1210     try {
1211       if (monitor != null) {
1212         monitor.setNeedsRescan();
1213       }
1214     } finally {
1215       crmLock.unlock();
1216     }
1217   }
1218 
1219   @VisibleForTesting
getCacheReplicationMonitor()1220   public Thread getCacheReplicationMonitor() {
1221     crmLock.lock();
1222     try {
1223       return monitor;
1224     } finally {
1225       crmLock.unlock();
1226     }
1227   }
1228 }
1229