1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.namenode; 19 20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT; 21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT; 22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES; 23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT; 24 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES; 25 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT; 26 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; 27 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT; 28 29 import java.io.DataInput; 30 import java.io.DataOutputStream; 31 import java.io.IOException; 32 import java.util.ArrayList; 33 import java.util.Collection; 34 import java.util.Collections; 35 import java.util.Date; 36 import java.util.EnumSet; 37 import java.util.Iterator; 38 import java.util.LinkedList; 39 import java.util.List; 40 import java.util.Map.Entry; 41 import java.util.SortedMap; 42 import java.util.TreeMap; 43 import java.util.concurrent.locks.ReentrantLock; 44 45 import org.apache.commons.io.IOUtils; 46 import org.apache.hadoop.classification.InterfaceAudience; 47 import org.apache.hadoop.conf.Configuration; 48 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; 49 import org.apache.hadoop.fs.CacheFlag; 50 import org.apache.hadoop.fs.InvalidRequestException; 51 import org.apache.hadoop.fs.Path; 52 import org.apache.hadoop.fs.UnresolvedLinkException; 53 import org.apache.hadoop.fs.permission.FsAction; 54 import org.apache.hadoop.fs.permission.FsPermission; 55 import org.apache.hadoop.hdfs.DFSUtil; 56 import org.apache.hadoop.hdfs.protocol.CacheDirective; 57 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; 58 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; 59 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration; 60 import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; 61 import org.apache.hadoop.hdfs.protocol.CachePoolEntry; 62 import org.apache.hadoop.hdfs.protocol.CachePoolInfo; 63 import org.apache.hadoop.hdfs.protocol.DatanodeID; 64 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 65 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto; 66 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; 67 import org.apache.hadoop.hdfs.protocolPB.PBHelper; 68 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; 69 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; 70 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; 71 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; 72 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type; 73 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection; 74 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; 75 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; 76 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; 77 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; 78 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter; 79 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; 80 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; 81 import org.apache.hadoop.hdfs.util.ReadOnlyList; 82 import org.apache.hadoop.security.AccessControlException; 83 import org.apache.hadoop.util.GSet; 84 import org.apache.hadoop.util.LightWeightGSet; 85 import org.apache.hadoop.util.Time; 86 import org.slf4j.Logger; 87 import org.slf4j.LoggerFactory; 88 89 import com.google.common.annotations.VisibleForTesting; 90 import com.google.common.collect.Lists; 91 92 /** 93 * The Cache Manager handles caching on DataNodes. 94 * 95 * This class is instantiated by the FSNamesystem. 96 * It maintains the mapping of cached blocks to datanodes via processing 97 * datanode cache reports. Based on these reports and addition and removal of 98 * caching directives, we will schedule caching and uncaching work. 99 */ 100 @InterfaceAudience.LimitedPrivate({"HDFS"}) 101 public final class CacheManager { 102 public static final Logger LOG = LoggerFactory.getLogger(CacheManager.class); 103 104 private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f; 105 106 // TODO: add pending / underCached / schedule cached blocks stats. 107 108 /** 109 * The FSNamesystem that contains this CacheManager. 110 */ 111 private final FSNamesystem namesystem; 112 113 /** 114 * The BlockManager associated with the FSN that owns this CacheManager. 115 */ 116 private final BlockManager blockManager; 117 118 /** 119 * Cache directives, sorted by ID. 120 * 121 * listCacheDirectives relies on the ordering of elements in this map 122 * to track what has already been listed by the client. 123 */ 124 private final TreeMap<Long, CacheDirective> directivesById = 125 new TreeMap<Long, CacheDirective>(); 126 127 /** 128 * The directive ID to use for a new directive. IDs always increase, and are 129 * never reused. 130 */ 131 private long nextDirectiveId; 132 133 /** 134 * Cache directives, sorted by path 135 */ 136 private final TreeMap<String, List<CacheDirective>> directivesByPath = 137 new TreeMap<String, List<CacheDirective>>(); 138 139 /** 140 * Cache pools, sorted by name. 141 */ 142 private final TreeMap<String, CachePool> cachePools = 143 new TreeMap<String, CachePool>(); 144 145 /** 146 * Maximum number of cache pools to list in one operation. 147 */ 148 private final int maxListCachePoolsResponses; 149 150 /** 151 * Maximum number of cache pool directives to list in one operation. 152 */ 153 private final int maxListCacheDirectivesNumResponses; 154 155 /** 156 * Interval between scans in milliseconds. 157 */ 158 private final long scanIntervalMs; 159 160 /** 161 * All cached blocks. 162 */ 163 private final GSet<CachedBlock, CachedBlock> cachedBlocks; 164 165 /** 166 * Lock which protects the CacheReplicationMonitor. 167 */ 168 private final ReentrantLock crmLock = new ReentrantLock(); 169 170 private final SerializerCompat serializerCompat = new SerializerCompat(); 171 172 /** 173 * The CacheReplicationMonitor. 174 */ 175 private CacheReplicationMonitor monitor; 176 177 public static final class PersistState { 178 public final CacheManagerSection section; 179 public final List<CachePoolInfoProto> pools; 180 public final List<CacheDirectiveInfoProto> directives; 181 PersistState(CacheManagerSection section, List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives)182 public PersistState(CacheManagerSection section, 183 List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives) { 184 this.section = section; 185 this.pools = pools; 186 this.directives = directives; 187 } 188 } 189 CacheManager(FSNamesystem namesystem, Configuration conf, BlockManager blockManager)190 CacheManager(FSNamesystem namesystem, Configuration conf, 191 BlockManager blockManager) { 192 this.namesystem = namesystem; 193 this.blockManager = blockManager; 194 this.nextDirectiveId = 1; 195 this.maxListCachePoolsResponses = conf.getInt( 196 DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 197 DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT); 198 this.maxListCacheDirectivesNumResponses = conf.getInt( 199 DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 200 DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT); 201 scanIntervalMs = conf.getLong( 202 DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 203 DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT); 204 float cachedBlocksPercent = conf.getFloat( 205 DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT, 206 DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT); 207 if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) { 208 LOG.info("Using minimum value {} for {}", MIN_CACHED_BLOCKS_PERCENT, 209 DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT); 210 cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT; 211 } 212 this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>( 213 LightWeightGSet.computeCapacity(cachedBlocksPercent, 214 "cachedBlocks")); 215 216 } 217 218 /** 219 * Resets all tracked directives and pools. Called during 2NN checkpointing to 220 * reset FSNamesystem state. See {@link FSNamesystem#clear()}. 221 */ clear()222 void clear() { 223 directivesById.clear(); 224 directivesByPath.clear(); 225 cachePools.clear(); 226 nextDirectiveId = 1; 227 } 228 startMonitorThread()229 public void startMonitorThread() { 230 crmLock.lock(); 231 try { 232 if (this.monitor == null) { 233 this.monitor = new CacheReplicationMonitor(namesystem, this, 234 scanIntervalMs, crmLock); 235 this.monitor.start(); 236 } 237 } finally { 238 crmLock.unlock(); 239 } 240 } 241 stopMonitorThread()242 public void stopMonitorThread() { 243 crmLock.lock(); 244 try { 245 if (this.monitor != null) { 246 CacheReplicationMonitor prevMonitor = this.monitor; 247 this.monitor = null; 248 IOUtils.closeQuietly(prevMonitor); 249 } 250 } finally { 251 crmLock.unlock(); 252 } 253 } 254 clearDirectiveStats()255 public void clearDirectiveStats() { 256 assert namesystem.hasWriteLock(); 257 for (CacheDirective directive : directivesById.values()) { 258 directive.resetStatistics(); 259 } 260 } 261 262 /** 263 * @return Unmodifiable view of the collection of CachePools. 264 */ getCachePools()265 public Collection<CachePool> getCachePools() { 266 assert namesystem.hasReadLock(); 267 return Collections.unmodifiableCollection(cachePools.values()); 268 } 269 270 /** 271 * @return Unmodifiable view of the collection of CacheDirectives. 272 */ getCacheDirectives()273 public Collection<CacheDirective> getCacheDirectives() { 274 assert namesystem.hasReadLock(); 275 return Collections.unmodifiableCollection(directivesById.values()); 276 } 277 278 @VisibleForTesting getCachedBlocks()279 public GSet<CachedBlock, CachedBlock> getCachedBlocks() { 280 assert namesystem.hasReadLock(); 281 return cachedBlocks; 282 } 283 getNextDirectiveId()284 private long getNextDirectiveId() throws IOException { 285 assert namesystem.hasWriteLock(); 286 if (nextDirectiveId >= Long.MAX_VALUE - 1) { 287 throw new IOException("No more available IDs."); 288 } 289 return nextDirectiveId++; 290 } 291 292 // Helper getter / validation methods 293 checkWritePermission(FSPermissionChecker pc, CachePool pool)294 private static void checkWritePermission(FSPermissionChecker pc, 295 CachePool pool) throws AccessControlException { 296 if ((pc != null)) { 297 pc.checkPermission(pool, FsAction.WRITE); 298 } 299 } 300 validatePoolName(CacheDirectiveInfo directive)301 private static String validatePoolName(CacheDirectiveInfo directive) 302 throws InvalidRequestException { 303 String pool = directive.getPool(); 304 if (pool == null) { 305 throw new InvalidRequestException("No pool specified."); 306 } 307 if (pool.isEmpty()) { 308 throw new InvalidRequestException("Invalid empty pool name."); 309 } 310 return pool; 311 } 312 validatePath(CacheDirectiveInfo directive)313 private static String validatePath(CacheDirectiveInfo directive) 314 throws InvalidRequestException { 315 if (directive.getPath() == null) { 316 throw new InvalidRequestException("No path specified."); 317 } 318 String path = directive.getPath().toUri().getPath(); 319 if (!DFSUtil.isValidName(path)) { 320 throw new InvalidRequestException("Invalid path '" + path + "'."); 321 } 322 return path; 323 } 324 validateReplication(CacheDirectiveInfo directive, short defaultValue)325 private static short validateReplication(CacheDirectiveInfo directive, 326 short defaultValue) throws InvalidRequestException { 327 short repl = (directive.getReplication() != null) 328 ? directive.getReplication() : defaultValue; 329 if (repl <= 0) { 330 throw new InvalidRequestException("Invalid replication factor " + repl 331 + " <= 0"); 332 } 333 return repl; 334 } 335 336 /** 337 * Calculates the absolute expiry time of the directive from the 338 * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration 339 * into an absolute time based on the local clock. 340 * 341 * @param info to validate. 342 * @param maxRelativeExpiryTime of the info's pool. 343 * @return the expiration time, or the pool's max absolute expiration if the 344 * info's expiration was not set. 345 * @throws InvalidRequestException if the info's Expiration is invalid. 346 */ validateExpiryTime(CacheDirectiveInfo info, long maxRelativeExpiryTime)347 private static long validateExpiryTime(CacheDirectiveInfo info, 348 long maxRelativeExpiryTime) throws InvalidRequestException { 349 LOG.trace("Validating directive {} pool maxRelativeExpiryTime {}", info, 350 maxRelativeExpiryTime); 351 final long now = new Date().getTime(); 352 final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime; 353 if (info == null || info.getExpiration() == null) { 354 return maxAbsoluteExpiryTime; 355 } 356 Expiration expiry = info.getExpiration(); 357 if (expiry.getMillis() < 0l) { 358 throw new InvalidRequestException("Cannot set a negative expiration: " 359 + expiry.getMillis()); 360 } 361 long relExpiryTime, absExpiryTime; 362 if (expiry.isRelative()) { 363 relExpiryTime = expiry.getMillis(); 364 absExpiryTime = now + relExpiryTime; 365 } else { 366 absExpiryTime = expiry.getMillis(); 367 relExpiryTime = absExpiryTime - now; 368 } 369 // Need to cap the expiry so we don't overflow a long when doing math 370 if (relExpiryTime > Expiration.MAX_RELATIVE_EXPIRY_MS) { 371 throw new InvalidRequestException("Expiration " 372 + expiry.toString() + " is too far in the future!"); 373 } 374 // Fail if the requested expiry is greater than the max 375 if (relExpiryTime > maxRelativeExpiryTime) { 376 throw new InvalidRequestException("Expiration " + expiry.toString() 377 + " exceeds the max relative expiration time of " 378 + maxRelativeExpiryTime + " ms."); 379 } 380 return absExpiryTime; 381 } 382 383 /** 384 * Throws an exception if the CachePool does not have enough capacity to 385 * cache the given path at the replication factor. 386 * 387 * @param pool CachePool where the path is being cached 388 * @param path Path that is being cached 389 * @param replication Replication factor of the path 390 * @throws InvalidRequestException if the pool does not have enough capacity 391 */ checkLimit(CachePool pool, String path, short replication)392 private void checkLimit(CachePool pool, String path, 393 short replication) throws InvalidRequestException { 394 CacheDirectiveStats stats = computeNeeded(path, replication); 395 if (pool.getLimit() == CachePoolInfo.LIMIT_UNLIMITED) { 396 return; 397 } 398 if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool 399 .getLimit()) { 400 throw new InvalidRequestException("Caching path " + path + " of size " 401 + stats.getBytesNeeded() / replication + " bytes at replication " 402 + replication + " would exceed pool " + pool.getPoolName() 403 + "'s remaining capacity of " 404 + (pool.getLimit() - pool.getBytesNeeded()) + " bytes."); 405 } 406 } 407 408 /** 409 * Computes the needed number of bytes and files for a path. 410 * @return CacheDirectiveStats describing the needed stats for this path 411 */ computeNeeded(String path, short replication)412 private CacheDirectiveStats computeNeeded(String path, short replication) { 413 FSDirectory fsDir = namesystem.getFSDirectory(); 414 INode node; 415 long requestedBytes = 0; 416 long requestedFiles = 0; 417 CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder(); 418 try { 419 node = fsDir.getINode(path); 420 } catch (UnresolvedLinkException e) { 421 // We don't cache through symlinks 422 return builder.build(); 423 } 424 if (node == null) { 425 return builder.build(); 426 } 427 if (node.isFile()) { 428 requestedFiles = 1; 429 INodeFile file = node.asFile(); 430 requestedBytes = file.computeFileSize(); 431 } else if (node.isDirectory()) { 432 INodeDirectory dir = node.asDirectory(); 433 ReadOnlyList<INode> children = dir 434 .getChildrenList(Snapshot.CURRENT_STATE_ID); 435 requestedFiles = children.size(); 436 for (INode child : children) { 437 if (child.isFile()) { 438 requestedBytes += child.asFile().computeFileSize(); 439 } 440 } 441 } 442 return new CacheDirectiveStats.Builder() 443 .setBytesNeeded(requestedBytes) 444 .setFilesCached(requestedFiles) 445 .build(); 446 } 447 448 /** 449 * Get a CacheDirective by ID, validating the ID and that the directive 450 * exists. 451 */ getById(long id)452 private CacheDirective getById(long id) throws InvalidRequestException { 453 // Check for invalid IDs. 454 if (id <= 0) { 455 throw new InvalidRequestException("Invalid negative ID."); 456 } 457 // Find the directive. 458 CacheDirective directive = directivesById.get(id); 459 if (directive == null) { 460 throw new InvalidRequestException("No directive with ID " + id 461 + " found."); 462 } 463 return directive; 464 } 465 466 /** 467 * Get a CachePool by name, validating that it exists. 468 */ getCachePool(String poolName)469 private CachePool getCachePool(String poolName) 470 throws InvalidRequestException { 471 CachePool pool = cachePools.get(poolName); 472 if (pool == null) { 473 throw new InvalidRequestException("Unknown pool " + poolName); 474 } 475 return pool; 476 } 477 478 // RPC handlers 479 addInternal(CacheDirective directive, CachePool pool)480 private void addInternal(CacheDirective directive, CachePool pool) { 481 boolean addedDirective = pool.getDirectiveList().add(directive); 482 assert addedDirective; 483 directivesById.put(directive.getId(), directive); 484 String path = directive.getPath(); 485 List<CacheDirective> directives = directivesByPath.get(path); 486 if (directives == null) { 487 directives = new ArrayList<CacheDirective>(1); 488 directivesByPath.put(path, directives); 489 } 490 directives.add(directive); 491 // Fix up pool stats 492 CacheDirectiveStats stats = 493 computeNeeded(directive.getPath(), directive.getReplication()); 494 directive.addBytesNeeded(stats.getBytesNeeded()); 495 directive.addFilesNeeded(directive.getFilesNeeded()); 496 497 setNeedsRescan(); 498 } 499 500 /** 501 * Adds a directive, skipping most error checking. This should only be called 502 * internally in special scenarios like edit log replay. 503 */ addDirectiveFromEditLog(CacheDirectiveInfo directive)504 CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive) 505 throws InvalidRequestException { 506 long id = directive.getId(); 507 CacheDirective entry = new CacheDirective(directive); 508 CachePool pool = cachePools.get(directive.getPool()); 509 addInternal(entry, pool); 510 if (nextDirectiveId <= id) { 511 nextDirectiveId = id + 1; 512 } 513 return entry.toInfo(); 514 } 515 addDirective( CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)516 public CacheDirectiveInfo addDirective( 517 CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags) 518 throws IOException { 519 assert namesystem.hasWriteLock(); 520 CacheDirective directive; 521 try { 522 CachePool pool = getCachePool(validatePoolName(info)); 523 checkWritePermission(pc, pool); 524 String path = validatePath(info); 525 short replication = validateReplication(info, (short)1); 526 long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs()); 527 // Do quota validation if required 528 if (!flags.contains(CacheFlag.FORCE)) { 529 checkLimit(pool, path, replication); 530 } 531 // All validation passed 532 // Add a new entry with the next available ID. 533 long id = getNextDirectiveId(); 534 directive = new CacheDirective(id, path, replication, expiryTime); 535 addInternal(directive, pool); 536 } catch (IOException e) { 537 LOG.warn("addDirective of " + info + " failed: ", e); 538 throw e; 539 } 540 LOG.info("addDirective of {} successful.", info); 541 return directive.toInfo(); 542 } 543 544 /** 545 * Factory method that makes a new CacheDirectiveInfo by applying fields in a 546 * CacheDirectiveInfo to an existing CacheDirective. 547 * 548 * @param info with some or all fields set. 549 * @param defaults directive providing default values for unset fields in 550 * info. 551 * 552 * @return new CacheDirectiveInfo of the info applied to the defaults. 553 */ createFromInfoAndDefaults( CacheDirectiveInfo info, CacheDirective defaults)554 private static CacheDirectiveInfo createFromInfoAndDefaults( 555 CacheDirectiveInfo info, CacheDirective defaults) { 556 // Initialize the builder with the default values 557 CacheDirectiveInfo.Builder builder = 558 new CacheDirectiveInfo.Builder(defaults.toInfo()); 559 // Replace default with new value if present 560 if (info.getPath() != null) { 561 builder.setPath(info.getPath()); 562 } 563 if (info.getReplication() != null) { 564 builder.setReplication(info.getReplication()); 565 } 566 if (info.getPool() != null) { 567 builder.setPool(info.getPool()); 568 } 569 if (info.getExpiration() != null) { 570 builder.setExpiration(info.getExpiration()); 571 } 572 return builder.build(); 573 } 574 575 /** 576 * Modifies a directive, skipping most error checking. This is for careful 577 * internal use only. modifyDirective can be non-deterministic since its error 578 * checking depends on current system time, which poses a problem for edit log 579 * replay. 580 */ modifyDirectiveFromEditLog(CacheDirectiveInfo info)581 void modifyDirectiveFromEditLog(CacheDirectiveInfo info) 582 throws InvalidRequestException { 583 // Check for invalid IDs. 584 Long id = info.getId(); 585 if (id == null) { 586 throw new InvalidRequestException("Must supply an ID."); 587 } 588 CacheDirective prevEntry = getById(id); 589 CacheDirectiveInfo newInfo = createFromInfoAndDefaults(info, prevEntry); 590 removeInternal(prevEntry); 591 addInternal(new CacheDirective(newInfo), getCachePool(newInfo.getPool())); 592 } 593 modifyDirective(CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)594 public void modifyDirective(CacheDirectiveInfo info, 595 FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException { 596 assert namesystem.hasWriteLock(); 597 String idString = 598 (info.getId() == null) ? 599 "(null)" : info.getId().toString(); 600 try { 601 // Check for invalid IDs. 602 Long id = info.getId(); 603 if (id == null) { 604 throw new InvalidRequestException("Must supply an ID."); 605 } 606 CacheDirective prevEntry = getById(id); 607 checkWritePermission(pc, prevEntry.getPool()); 608 609 // Fill in defaults 610 CacheDirectiveInfo infoWithDefaults = 611 createFromInfoAndDefaults(info, prevEntry); 612 CacheDirectiveInfo.Builder builder = 613 new CacheDirectiveInfo.Builder(infoWithDefaults); 614 615 // Do validation 616 validatePath(infoWithDefaults); 617 validateReplication(infoWithDefaults, (short)-1); 618 // Need to test the pool being set here to avoid rejecting a modify for a 619 // directive that's already been forced into a pool 620 CachePool srcPool = prevEntry.getPool(); 621 CachePool destPool = getCachePool(validatePoolName(infoWithDefaults)); 622 if (!srcPool.getPoolName().equals(destPool.getPoolName())) { 623 checkWritePermission(pc, destPool); 624 if (!flags.contains(CacheFlag.FORCE)) { 625 checkLimit(destPool, infoWithDefaults.getPath().toUri().getPath(), 626 infoWithDefaults.getReplication()); 627 } 628 } 629 // Verify the expiration against the destination pool 630 validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs()); 631 632 // Indicate changes to the CRM 633 setNeedsRescan(); 634 635 // Validation passed 636 removeInternal(prevEntry); 637 addInternal(new CacheDirective(builder.build()), destPool); 638 } catch (IOException e) { 639 LOG.warn("modifyDirective of " + idString + " failed: ", e); 640 throw e; 641 } 642 LOG.info("modifyDirective of {} successfully applied {}.", idString, info); 643 } 644 removeInternal(CacheDirective directive)645 private void removeInternal(CacheDirective directive) 646 throws InvalidRequestException { 647 assert namesystem.hasWriteLock(); 648 // Remove the corresponding entry in directivesByPath. 649 String path = directive.getPath(); 650 List<CacheDirective> directives = directivesByPath.get(path); 651 if (directives == null || !directives.remove(directive)) { 652 throw new InvalidRequestException("Failed to locate entry " + 653 directive.getId() + " by path " + directive.getPath()); 654 } 655 if (directives.size() == 0) { 656 directivesByPath.remove(path); 657 } 658 // Fix up the stats from removing the pool 659 final CachePool pool = directive.getPool(); 660 directive.addBytesNeeded(-directive.getBytesNeeded()); 661 directive.addFilesNeeded(-directive.getFilesNeeded()); 662 663 directivesById.remove(directive.getId()); 664 pool.getDirectiveList().remove(directive); 665 assert directive.getPool() == null; 666 667 setNeedsRescan(); 668 } 669 removeDirective(long id, FSPermissionChecker pc)670 public void removeDirective(long id, FSPermissionChecker pc) 671 throws IOException { 672 assert namesystem.hasWriteLock(); 673 try { 674 CacheDirective directive = getById(id); 675 checkWritePermission(pc, directive.getPool()); 676 removeInternal(directive); 677 } catch (IOException e) { 678 LOG.warn("removeDirective of " + id + " failed: ", e); 679 throw e; 680 } 681 LOG.info("removeDirective of " + id + " successful."); 682 } 683 684 public BatchedListEntries<CacheDirectiveEntry> listCacheDirectives(long prevId, CacheDirectiveInfo filter, FSPermissionChecker pc)685 listCacheDirectives(long prevId, 686 CacheDirectiveInfo filter, 687 FSPermissionChecker pc) throws IOException { 688 assert namesystem.hasReadLock(); 689 final int NUM_PRE_ALLOCATED_ENTRIES = 16; 690 String filterPath = null; 691 if (filter.getPath() != null) { 692 filterPath = validatePath(filter); 693 } 694 if (filter.getReplication() != null) { 695 throw new InvalidRequestException( 696 "Filtering by replication is unsupported."); 697 } 698 699 // Querying for a single ID 700 final Long id = filter.getId(); 701 if (id != null) { 702 if (!directivesById.containsKey(id)) { 703 throw new InvalidRequestException("Did not find requested id " + id); 704 } 705 // Since we use a tailMap on directivesById, setting prev to id-1 gets 706 // us the directive with the id (if present) 707 prevId = id - 1; 708 } 709 710 ArrayList<CacheDirectiveEntry> replies = 711 new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES); 712 int numReplies = 0; 713 SortedMap<Long, CacheDirective> tailMap = 714 directivesById.tailMap(prevId + 1); 715 for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) { 716 if (numReplies >= maxListCacheDirectivesNumResponses) { 717 return new BatchedListEntries<CacheDirectiveEntry>(replies, true); 718 } 719 CacheDirective curDirective = cur.getValue(); 720 CacheDirectiveInfo info = cur.getValue().toInfo(); 721 722 // If the requested ID is present, it should be the first item. 723 // Hitting this case means the ID is not present, or we're on the second 724 // item and should break out. 725 if (id != null && 726 !(info.getId().equals(id))) { 727 break; 728 } 729 if (filter.getPool() != null && 730 !info.getPool().equals(filter.getPool())) { 731 continue; 732 } 733 if (filterPath != null && 734 !info.getPath().toUri().getPath().equals(filterPath)) { 735 continue; 736 } 737 boolean hasPermission = true; 738 if (pc != null) { 739 try { 740 pc.checkPermission(curDirective.getPool(), FsAction.READ); 741 } catch (AccessControlException e) { 742 hasPermission = false; 743 } 744 } 745 if (hasPermission) { 746 replies.add(new CacheDirectiveEntry(info, cur.getValue().toStats())); 747 numReplies++; 748 } 749 } 750 return new BatchedListEntries<CacheDirectiveEntry>(replies, false); 751 } 752 753 /** 754 * Create a cache pool. 755 * 756 * Only the superuser should be able to call this function. 757 * 758 * @param info The info for the cache pool to create. 759 * @return Information about the cache pool we created. 760 */ addCachePool(CachePoolInfo info)761 public CachePoolInfo addCachePool(CachePoolInfo info) 762 throws IOException { 763 assert namesystem.hasWriteLock(); 764 CachePool pool; 765 try { 766 CachePoolInfo.validate(info); 767 String poolName = info.getPoolName(); 768 pool = cachePools.get(poolName); 769 if (pool != null) { 770 throw new InvalidRequestException("Cache pool " + poolName 771 + " already exists."); 772 } 773 pool = CachePool.createFromInfoAndDefaults(info); 774 cachePools.put(pool.getPoolName(), pool); 775 } catch (IOException e) { 776 LOG.info("addCachePool of " + info + " failed: ", e); 777 throw e; 778 } 779 LOG.info("addCachePool of {} successful.", info); 780 return pool.getInfo(true); 781 } 782 783 /** 784 * Modify a cache pool. 785 * 786 * Only the superuser should be able to call this function. 787 * 788 * @param info 789 * The info for the cache pool to modify. 790 */ modifyCachePool(CachePoolInfo info)791 public void modifyCachePool(CachePoolInfo info) 792 throws IOException { 793 assert namesystem.hasWriteLock(); 794 StringBuilder bld = new StringBuilder(); 795 try { 796 CachePoolInfo.validate(info); 797 String poolName = info.getPoolName(); 798 CachePool pool = cachePools.get(poolName); 799 if (pool == null) { 800 throw new InvalidRequestException("Cache pool " + poolName 801 + " does not exist."); 802 } 803 String prefix = ""; 804 if (info.getOwnerName() != null) { 805 pool.setOwnerName(info.getOwnerName()); 806 bld.append(prefix). 807 append("set owner to ").append(info.getOwnerName()); 808 prefix = "; "; 809 } 810 if (info.getGroupName() != null) { 811 pool.setGroupName(info.getGroupName()); 812 bld.append(prefix). 813 append("set group to ").append(info.getGroupName()); 814 prefix = "; "; 815 } 816 if (info.getMode() != null) { 817 pool.setMode(info.getMode()); 818 bld.append(prefix).append("set mode to " + info.getMode()); 819 prefix = "; "; 820 } 821 if (info.getLimit() != null) { 822 pool.setLimit(info.getLimit()); 823 bld.append(prefix).append("set limit to " + info.getLimit()); 824 prefix = "; "; 825 // New limit changes stats, need to set needs refresh 826 setNeedsRescan(); 827 } 828 if (info.getMaxRelativeExpiryMs() != null) { 829 final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs(); 830 pool.setMaxRelativeExpiryMs(maxRelativeExpiry); 831 bld.append(prefix).append("set maxRelativeExpiry to " 832 + maxRelativeExpiry); 833 prefix = "; "; 834 } 835 if (prefix.isEmpty()) { 836 bld.append("no changes."); 837 } 838 } catch (IOException e) { 839 LOG.info("modifyCachePool of " + info + " failed: ", e); 840 throw e; 841 } 842 LOG.info("modifyCachePool of {} successful; {}", info.getPoolName(), 843 bld.toString()); 844 } 845 846 /** 847 * Remove a cache pool. 848 * 849 * Only the superuser should be able to call this function. 850 * 851 * @param poolName 852 * The name for the cache pool to remove. 853 */ removeCachePool(String poolName)854 public void removeCachePool(String poolName) 855 throws IOException { 856 assert namesystem.hasWriteLock(); 857 try { 858 CachePoolInfo.validateName(poolName); 859 CachePool pool = cachePools.remove(poolName); 860 if (pool == null) { 861 throw new InvalidRequestException( 862 "Cannot remove non-existent cache pool " + poolName); 863 } 864 // Remove all directives in this pool. 865 Iterator<CacheDirective> iter = pool.getDirectiveList().iterator(); 866 while (iter.hasNext()) { 867 CacheDirective directive = iter.next(); 868 directivesByPath.remove(directive.getPath()); 869 directivesById.remove(directive.getId()); 870 iter.remove(); 871 } 872 setNeedsRescan(); 873 } catch (IOException e) { 874 LOG.info("removeCachePool of " + poolName + " failed: ", e); 875 throw e; 876 } 877 LOG.info("removeCachePool of " + poolName + " successful."); 878 } 879 880 public BatchedListEntries<CachePoolEntry> listCachePools(FSPermissionChecker pc, String prevKey)881 listCachePools(FSPermissionChecker pc, String prevKey) { 882 assert namesystem.hasReadLock(); 883 final int NUM_PRE_ALLOCATED_ENTRIES = 16; 884 ArrayList<CachePoolEntry> results = 885 new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES); 886 SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false); 887 int numListed = 0; 888 for (Entry<String, CachePool> cur : tailMap.entrySet()) { 889 if (numListed++ >= maxListCachePoolsResponses) { 890 return new BatchedListEntries<CachePoolEntry>(results, true); 891 } 892 results.add(cur.getValue().getEntry(pc)); 893 } 894 return new BatchedListEntries<CachePoolEntry>(results, false); 895 } 896 setCachedLocations(LocatedBlock block)897 public void setCachedLocations(LocatedBlock block) { 898 CachedBlock cachedBlock = 899 new CachedBlock(block.getBlock().getBlockId(), 900 (short)0, false); 901 cachedBlock = cachedBlocks.get(cachedBlock); 902 if (cachedBlock == null) { 903 return; 904 } 905 List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED); 906 for (DatanodeDescriptor datanode : datanodes) { 907 block.addCachedLoc(datanode); 908 } 909 } 910 processCacheReport(final DatanodeID datanodeID, final List<Long> blockIds)911 public final void processCacheReport(final DatanodeID datanodeID, 912 final List<Long> blockIds) throws IOException { 913 namesystem.writeLock(); 914 final long startTime = Time.monotonicNow(); 915 final long endTime; 916 try { 917 final DatanodeDescriptor datanode = 918 blockManager.getDatanodeManager().getDatanode(datanodeID); 919 if (datanode == null || !datanode.isAlive) { 920 throw new IOException( 921 "processCacheReport from dead or unregistered datanode: " + 922 datanode); 923 } 924 processCacheReportImpl(datanode, blockIds); 925 } finally { 926 endTime = Time.monotonicNow(); 927 namesystem.writeUnlock(); 928 } 929 930 // Log the block report processing stats from Namenode perspective 931 final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); 932 if (metrics != null) { 933 metrics.addCacheBlockReport((int) (endTime - startTime)); 934 } 935 LOG.debug("Processed cache report from {}, blocks: {}, " + 936 "processing time: {} msecs", datanodeID, blockIds.size(), 937 (endTime - startTime)); 938 } 939 processCacheReportImpl(final DatanodeDescriptor datanode, final List<Long> blockIds)940 private void processCacheReportImpl(final DatanodeDescriptor datanode, 941 final List<Long> blockIds) { 942 CachedBlocksList cached = datanode.getCached(); 943 cached.clear(); 944 CachedBlocksList cachedList = datanode.getCached(); 945 CachedBlocksList pendingCachedList = datanode.getPendingCached(); 946 for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) { 947 long blockId = iter.next(); 948 LOG.trace("Cache report from datanode {} has block {}", datanode, 949 blockId); 950 CachedBlock cachedBlock = 951 new CachedBlock(blockId, (short)0, false); 952 CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock); 953 // Add the block ID from the cache report to the cachedBlocks map 954 // if it's not already there. 955 if (prevCachedBlock != null) { 956 cachedBlock = prevCachedBlock; 957 } else { 958 cachedBlocks.put(cachedBlock); 959 LOG.trace("Added block {} to cachedBlocks", cachedBlock); 960 } 961 // Add the block to the datanode's implicit cached block list 962 // if it's not already there. Similarly, remove it from the pending 963 // cached block list if it exists there. 964 if (!cachedBlock.isPresent(cachedList)) { 965 cachedList.add(cachedBlock); 966 LOG.trace("Added block {} to CACHED list.", cachedBlock); 967 } 968 if (cachedBlock.isPresent(pendingCachedList)) { 969 pendingCachedList.remove(cachedBlock); 970 LOG.trace("Removed block {} from PENDING_CACHED list.", cachedBlock); 971 } 972 } 973 } 974 975 /** 976 * Saves the current state of the CacheManager to the DataOutput. Used 977 * to persist CacheManager state in the FSImage. 978 * @param out DataOutput to persist state 979 * @param sdPath path of the storage directory 980 * @throws IOException 981 */ saveStateCompat(DataOutputStream out, String sdPath)982 public void saveStateCompat(DataOutputStream out, String sdPath) 983 throws IOException { 984 serializerCompat.save(out, sdPath); 985 } 986 saveState()987 public PersistState saveState() throws IOException { 988 ArrayList<CachePoolInfoProto> pools = Lists 989 .newArrayListWithCapacity(cachePools.size()); 990 ArrayList<CacheDirectiveInfoProto> directives = Lists 991 .newArrayListWithCapacity(directivesById.size()); 992 993 for (CachePool pool : cachePools.values()) { 994 CachePoolInfo p = pool.getInfo(true); 995 CachePoolInfoProto.Builder b = CachePoolInfoProto.newBuilder() 996 .setPoolName(p.getPoolName()); 997 998 if (p.getOwnerName() != null) 999 b.setOwnerName(p.getOwnerName()); 1000 1001 if (p.getGroupName() != null) 1002 b.setGroupName(p.getGroupName()); 1003 1004 if (p.getMode() != null) 1005 b.setMode(p.getMode().toShort()); 1006 1007 if (p.getLimit() != null) 1008 b.setLimit(p.getLimit()); 1009 1010 pools.add(b.build()); 1011 } 1012 1013 for (CacheDirective directive : directivesById.values()) { 1014 CacheDirectiveInfo info = directive.toInfo(); 1015 CacheDirectiveInfoProto.Builder b = CacheDirectiveInfoProto.newBuilder() 1016 .setId(info.getId()); 1017 1018 if (info.getPath() != null) { 1019 b.setPath(info.getPath().toUri().getPath()); 1020 } 1021 1022 if (info.getReplication() != null) { 1023 b.setReplication(info.getReplication()); 1024 } 1025 1026 if (info.getPool() != null) { 1027 b.setPool(info.getPool()); 1028 } 1029 1030 Expiration expiry = info.getExpiration(); 1031 if (expiry != null) { 1032 assert (!expiry.isRelative()); 1033 b.setExpiration(PBHelper.convert(expiry)); 1034 } 1035 1036 directives.add(b.build()); 1037 } 1038 CacheManagerSection s = CacheManagerSection.newBuilder() 1039 .setNextDirectiveId(nextDirectiveId).setNumPools(pools.size()) 1040 .setNumDirectives(directives.size()).build(); 1041 1042 return new PersistState(s, pools, directives); 1043 } 1044 1045 /** 1046 * Reloads CacheManager state from the passed DataInput. Used during namenode 1047 * startup to restore CacheManager state from an FSImage. 1048 * @param in DataInput from which to restore state 1049 * @throws IOException 1050 */ loadStateCompat(DataInput in)1051 public void loadStateCompat(DataInput in) throws IOException { 1052 serializerCompat.load(in); 1053 } 1054 loadState(PersistState s)1055 public void loadState(PersistState s) throws IOException { 1056 nextDirectiveId = s.section.getNextDirectiveId(); 1057 for (CachePoolInfoProto p : s.pools) { 1058 CachePoolInfo info = new CachePoolInfo(p.getPoolName()); 1059 if (p.hasOwnerName()) 1060 info.setOwnerName(p.getOwnerName()); 1061 1062 if (p.hasGroupName()) 1063 info.setGroupName(p.getGroupName()); 1064 1065 if (p.hasMode()) 1066 info.setMode(new FsPermission((short) p.getMode())); 1067 1068 if (p.hasLimit()) 1069 info.setLimit(p.getLimit()); 1070 1071 addCachePool(info); 1072 } 1073 1074 for (CacheDirectiveInfoProto p : s.directives) { 1075 // Get pool reference by looking it up in the map 1076 final String poolName = p.getPool(); 1077 CacheDirective directive = new CacheDirective(p.getId(), new Path( 1078 p.getPath()).toUri().getPath(), (short) p.getReplication(), p 1079 .getExpiration().getMillis()); 1080 addCacheDirective(poolName, directive); 1081 } 1082 } 1083 addCacheDirective(final String poolName, final CacheDirective directive)1084 private void addCacheDirective(final String poolName, 1085 final CacheDirective directive) throws IOException { 1086 CachePool pool = cachePools.get(poolName); 1087 if (pool == null) { 1088 throw new IOException("Directive refers to pool " + poolName 1089 + ", which does not exist."); 1090 } 1091 boolean addedDirective = pool.getDirectiveList().add(directive); 1092 assert addedDirective; 1093 if (directivesById.put(directive.getId(), directive) != null) { 1094 throw new IOException("A directive with ID " + directive.getId() 1095 + " already exists"); 1096 } 1097 List<CacheDirective> directives = directivesByPath.get(directive.getPath()); 1098 if (directives == null) { 1099 directives = new LinkedList<CacheDirective>(); 1100 directivesByPath.put(directive.getPath(), directives); 1101 } 1102 directives.add(directive); 1103 } 1104 1105 private final class SerializerCompat { save(DataOutputStream out, String sdPath)1106 private void save(DataOutputStream out, String sdPath) throws IOException { 1107 out.writeLong(nextDirectiveId); 1108 savePools(out, sdPath); 1109 saveDirectives(out, sdPath); 1110 } 1111 load(DataInput in)1112 private void load(DataInput in) throws IOException { 1113 nextDirectiveId = in.readLong(); 1114 // pools need to be loaded first since directives point to their parent pool 1115 loadPools(in); 1116 loadDirectives(in); 1117 } 1118 1119 /** 1120 * Save cache pools to fsimage 1121 */ savePools(DataOutputStream out, String sdPath)1122 private void savePools(DataOutputStream out, 1123 String sdPath) throws IOException { 1124 StartupProgress prog = NameNode.getStartupProgress(); 1125 Step step = new Step(StepType.CACHE_POOLS, sdPath); 1126 prog.beginStep(Phase.SAVING_CHECKPOINT, step); 1127 prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size()); 1128 Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); 1129 out.writeInt(cachePools.size()); 1130 for (CachePool pool: cachePools.values()) { 1131 FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true)); 1132 counter.increment(); 1133 } 1134 prog.endStep(Phase.SAVING_CHECKPOINT, step); 1135 } 1136 1137 /* 1138 * Save cache entries to fsimage 1139 */ saveDirectives(DataOutputStream out, String sdPath)1140 private void saveDirectives(DataOutputStream out, String sdPath) 1141 throws IOException { 1142 StartupProgress prog = NameNode.getStartupProgress(); 1143 Step step = new Step(StepType.CACHE_ENTRIES, sdPath); 1144 prog.beginStep(Phase.SAVING_CHECKPOINT, step); 1145 prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size()); 1146 Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); 1147 out.writeInt(directivesById.size()); 1148 for (CacheDirective directive : directivesById.values()) { 1149 FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo()); 1150 counter.increment(); 1151 } 1152 prog.endStep(Phase.SAVING_CHECKPOINT, step); 1153 } 1154 1155 /** 1156 * Load cache pools from fsimage 1157 */ loadPools(DataInput in)1158 private void loadPools(DataInput in) 1159 throws IOException { 1160 StartupProgress prog = NameNode.getStartupProgress(); 1161 Step step = new Step(StepType.CACHE_POOLS); 1162 prog.beginStep(Phase.LOADING_FSIMAGE, step); 1163 int numberOfPools = in.readInt(); 1164 prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools); 1165 Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); 1166 for (int i = 0; i < numberOfPools; i++) { 1167 addCachePool(FSImageSerialization.readCachePoolInfo(in)); 1168 counter.increment(); 1169 } 1170 prog.endStep(Phase.LOADING_FSIMAGE, step); 1171 } 1172 1173 /** 1174 * Load cache directives from the fsimage 1175 */ loadDirectives(DataInput in)1176 private void loadDirectives(DataInput in) throws IOException { 1177 StartupProgress prog = NameNode.getStartupProgress(); 1178 Step step = new Step(StepType.CACHE_ENTRIES); 1179 prog.beginStep(Phase.LOADING_FSIMAGE, step); 1180 int numDirectives = in.readInt(); 1181 prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives); 1182 Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); 1183 for (int i = 0; i < numDirectives; i++) { 1184 CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in); 1185 // Get pool reference by looking it up in the map 1186 final String poolName = info.getPool(); 1187 CacheDirective directive = 1188 new CacheDirective(info.getId(), info.getPath().toUri().getPath(), 1189 info.getReplication(), info.getExpiration().getAbsoluteMillis()); 1190 addCacheDirective(poolName, directive); 1191 counter.increment(); 1192 } 1193 prog.endStep(Phase.LOADING_FSIMAGE, step); 1194 } 1195 } 1196 waitForRescanIfNeeded()1197 public void waitForRescanIfNeeded() { 1198 crmLock.lock(); 1199 try { 1200 if (monitor != null) { 1201 monitor.waitForRescanIfNeeded(); 1202 } 1203 } finally { 1204 crmLock.unlock(); 1205 } 1206 } 1207 setNeedsRescan()1208 private void setNeedsRescan() { 1209 crmLock.lock(); 1210 try { 1211 if (monitor != null) { 1212 monitor.setNeedsRescan(); 1213 } 1214 } finally { 1215 crmLock.unlock(); 1216 } 1217 } 1218 1219 @VisibleForTesting getCacheReplicationMonitor()1220 public Thread getCacheReplicationMonitor() { 1221 crmLock.lock(); 1222 try { 1223 return monitor; 1224 } finally { 1225 crmLock.unlock(); 1226 } 1227 } 1228 } 1229