1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.blockmanagement; 19 20 import java.io.PrintWriter; 21 import java.text.SimpleDateFormat; 22 import java.util.ArrayList; 23 import java.util.Calendar; 24 import java.util.GregorianCalendar; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.TreeMap; 28 29 import org.apache.hadoop.classification.InterfaceAudience; 30 import org.apache.hadoop.hdfs.DFSConfigKeys; 31 import org.apache.hadoop.hdfs.protocol.Block; 32 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 33 import org.apache.hadoop.hdfs.server.namenode.NameNode; 34 import org.apache.hadoop.hdfs.util.LightWeightHashSet; 35 import org.apache.hadoop.util.Time; 36 import org.apache.hadoop.hdfs.DFSUtil; 37 38 import com.google.common.annotations.VisibleForTesting; 39 import org.slf4j.Logger; 40 41 /** 42 * Keeps a Collection for every named machine containing blocks 43 * that have recently been invalidated and are thought to live 44 * on the machine in question. 45 */ 46 @InterfaceAudience.Private 47 class InvalidateBlocks { 48 /** Mapping: DatanodeInfo -> Collection of Blocks */ 49 private final Map<DatanodeInfo, LightWeightHashSet<Block>> node2blocks = 50 new TreeMap<DatanodeInfo, LightWeightHashSet<Block>>(); 51 /** The total number of blocks in the map. */ 52 private long numBlocks = 0L; 53 54 private final int blockInvalidateLimit; 55 56 /** 57 * The period of pending time for block invalidation since the NameNode 58 * startup 59 */ 60 private final long pendingPeriodInMs; 61 /** the startup time */ 62 private final long startupTime = Time.monotonicNow(); 63 InvalidateBlocks(final int blockInvalidateLimit, long pendingPeriodInMs)64 InvalidateBlocks(final int blockInvalidateLimit, long pendingPeriodInMs) { 65 this.blockInvalidateLimit = blockInvalidateLimit; 66 this.pendingPeriodInMs = pendingPeriodInMs; 67 printBlockDeletionTime(BlockManager.LOG); 68 } 69 printBlockDeletionTime(final Logger log)70 private void printBlockDeletionTime(final Logger log) { 71 log.info(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY 72 + " is set to " + DFSUtil.durationToString(pendingPeriodInMs)); 73 SimpleDateFormat sdf = new SimpleDateFormat("yyyy MMM dd HH:mm:ss"); 74 Calendar calendar = new GregorianCalendar(); 75 calendar.add(Calendar.SECOND, (int) (this.pendingPeriodInMs / 1000)); 76 log.info("The block deletion will start around " 77 + sdf.format(calendar.getTime())); 78 } 79 80 /** @return the number of blocks to be invalidated . */ numBlocks()81 synchronized long numBlocks() { 82 return numBlocks; 83 } 84 85 /** 86 * @return true if the given storage has the given block listed for 87 * invalidation. Blocks are compared including their generation stamps: 88 * if a block is pending invalidation but with a different generation stamp, 89 * returns false. 90 */ contains(final DatanodeInfo dn, final Block block)91 synchronized boolean contains(final DatanodeInfo dn, final Block block) { 92 final LightWeightHashSet<Block> s = node2blocks.get(dn); 93 if (s == null) { 94 return false; // no invalidate blocks for this storage ID 95 } 96 Block blockInSet = s.getElement(block); 97 return blockInSet != null && 98 block.getGenerationStamp() == blockInSet.getGenerationStamp(); 99 } 100 101 /** 102 * Add a block to the block collection 103 * which will be invalidated on the specified datanode. 104 */ add(final Block block, final DatanodeInfo datanode, final boolean log)105 synchronized void add(final Block block, final DatanodeInfo datanode, 106 final boolean log) { 107 LightWeightHashSet<Block> set = node2blocks.get(datanode); 108 if (set == null) { 109 set = new LightWeightHashSet<Block>(); 110 node2blocks.put(datanode, set); 111 } 112 if (set.add(block)) { 113 numBlocks++; 114 if (log) { 115 NameNode.blockStateChangeLog.info("BLOCK* {}: add {} to {}", 116 getClass().getSimpleName(), block, datanode); 117 } 118 } 119 } 120 121 /** Remove a storage from the invalidatesSet */ remove(final DatanodeInfo dn)122 synchronized void remove(final DatanodeInfo dn) { 123 final LightWeightHashSet<Block> blocks = node2blocks.remove(dn); 124 if (blocks != null) { 125 numBlocks -= blocks.size(); 126 } 127 } 128 129 /** Remove the block from the specified storage. */ remove(final DatanodeInfo dn, final Block block)130 synchronized void remove(final DatanodeInfo dn, final Block block) { 131 final LightWeightHashSet<Block> v = node2blocks.get(dn); 132 if (v != null && v.remove(block)) { 133 numBlocks--; 134 if (v.isEmpty()) { 135 node2blocks.remove(dn); 136 } 137 } 138 } 139 140 /** Print the contents to out. */ dump(final PrintWriter out)141 synchronized void dump(final PrintWriter out) { 142 final int size = node2blocks.values().size(); 143 out.println("Metasave: Blocks " + numBlocks 144 + " waiting deletion from " + size + " datanodes."); 145 if (size == 0) { 146 return; 147 } 148 149 for(Map.Entry<DatanodeInfo, LightWeightHashSet<Block>> entry : node2blocks.entrySet()) { 150 final LightWeightHashSet<Block> blocks = entry.getValue(); 151 if (blocks.size() > 0) { 152 out.println(entry.getKey()); 153 out.println(blocks); 154 } 155 } 156 } 157 158 /** @return a list of the storage IDs. */ getDatanodes()159 synchronized List<DatanodeInfo> getDatanodes() { 160 return new ArrayList<DatanodeInfo>(node2blocks.keySet()); 161 } 162 163 /** 164 * @return the remianing pending time 165 */ 166 @VisibleForTesting getInvalidationDelay()167 long getInvalidationDelay() { 168 return pendingPeriodInMs - (Time.monotonicNow() - startupTime); 169 } 170 invalidateWork(final DatanodeDescriptor dn)171 synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) { 172 final long delay = getInvalidationDelay(); 173 if (delay > 0) { 174 if (BlockManager.LOG.isDebugEnabled()) { 175 BlockManager.LOG 176 .debug("Block deletion is delayed during NameNode startup. " 177 + "The deletion will start after " + delay + " ms."); 178 } 179 return null; 180 } 181 final LightWeightHashSet<Block> set = node2blocks.get(dn); 182 if (set == null) { 183 return null; 184 } 185 186 // # blocks that can be sent in one message is limited 187 final int limit = blockInvalidateLimit; 188 final List<Block> toInvalidate = set.pollN(limit); 189 190 // If we send everything in this message, remove this node entry 191 if (set.isEmpty()) { 192 remove(dn); 193 } 194 195 dn.addBlocksToBeInvalidated(toInvalidate); 196 numBlocks -= toInvalidate.size(); 197 return toInvalidate; 198 } 199 clear()200 synchronized void clear() { 201 node2blocks.clear(); 202 numBlocks = 0; 203 } 204 } 205