1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.blockmanagement;
19 
20 import java.io.PrintWriter;
21 import java.text.SimpleDateFormat;
22 import java.util.ArrayList;
23 import java.util.Calendar;
24 import java.util.GregorianCalendar;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.TreeMap;
28 
29 import org.apache.hadoop.classification.InterfaceAudience;
30 import org.apache.hadoop.hdfs.DFSConfigKeys;
31 import org.apache.hadoop.hdfs.protocol.Block;
32 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
33 import org.apache.hadoop.hdfs.server.namenode.NameNode;
34 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
35 import org.apache.hadoop.util.Time;
36 import org.apache.hadoop.hdfs.DFSUtil;
37 
38 import com.google.common.annotations.VisibleForTesting;
39 import org.slf4j.Logger;
40 
41 /**
42  * Keeps a Collection for every named machine containing blocks
43  * that have recently been invalidated and are thought to live
44  * on the machine in question.
45  */
46 @InterfaceAudience.Private
47 class InvalidateBlocks {
48   /** Mapping: DatanodeInfo -> Collection of Blocks */
49   private final Map<DatanodeInfo, LightWeightHashSet<Block>> node2blocks =
50       new TreeMap<DatanodeInfo, LightWeightHashSet<Block>>();
51   /** The total number of blocks in the map. */
52   private long numBlocks = 0L;
53 
54   private final int blockInvalidateLimit;
55 
56   /**
57    * The period of pending time for block invalidation since the NameNode
58    * startup
59    */
60   private final long pendingPeriodInMs;
61   /** the startup time */
62   private final long startupTime = Time.monotonicNow();
63 
InvalidateBlocks(final int blockInvalidateLimit, long pendingPeriodInMs)64   InvalidateBlocks(final int blockInvalidateLimit, long pendingPeriodInMs) {
65     this.blockInvalidateLimit = blockInvalidateLimit;
66     this.pendingPeriodInMs = pendingPeriodInMs;
67     printBlockDeletionTime(BlockManager.LOG);
68   }
69 
printBlockDeletionTime(final Logger log)70   private void printBlockDeletionTime(final Logger log) {
71     log.info(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY
72         + " is set to " + DFSUtil.durationToString(pendingPeriodInMs));
73     SimpleDateFormat sdf = new SimpleDateFormat("yyyy MMM dd HH:mm:ss");
74     Calendar calendar = new GregorianCalendar();
75     calendar.add(Calendar.SECOND, (int) (this.pendingPeriodInMs / 1000));
76     log.info("The block deletion will start around "
77         + sdf.format(calendar.getTime()));
78   }
79 
80   /** @return the number of blocks to be invalidated . */
numBlocks()81   synchronized long numBlocks() {
82     return numBlocks;
83   }
84 
85   /**
86    * @return true if the given storage has the given block listed for
87    * invalidation. Blocks are compared including their generation stamps:
88    * if a block is pending invalidation but with a different generation stamp,
89    * returns false.
90    */
contains(final DatanodeInfo dn, final Block block)91   synchronized boolean contains(final DatanodeInfo dn, final Block block) {
92     final LightWeightHashSet<Block> s = node2blocks.get(dn);
93     if (s == null) {
94       return false; // no invalidate blocks for this storage ID
95     }
96     Block blockInSet = s.getElement(block);
97     return blockInSet != null &&
98         block.getGenerationStamp() == blockInSet.getGenerationStamp();
99   }
100 
101   /**
102    * Add a block to the block collection
103    * which will be invalidated on the specified datanode.
104    */
add(final Block block, final DatanodeInfo datanode, final boolean log)105   synchronized void add(final Block block, final DatanodeInfo datanode,
106       final boolean log) {
107     LightWeightHashSet<Block> set = node2blocks.get(datanode);
108     if (set == null) {
109       set = new LightWeightHashSet<Block>();
110       node2blocks.put(datanode, set);
111     }
112     if (set.add(block)) {
113       numBlocks++;
114       if (log) {
115         NameNode.blockStateChangeLog.info("BLOCK* {}: add {} to {}",
116             getClass().getSimpleName(), block, datanode);
117       }
118     }
119   }
120 
121   /** Remove a storage from the invalidatesSet */
remove(final DatanodeInfo dn)122   synchronized void remove(final DatanodeInfo dn) {
123     final LightWeightHashSet<Block> blocks = node2blocks.remove(dn);
124     if (blocks != null) {
125       numBlocks -= blocks.size();
126     }
127   }
128 
129   /** Remove the block from the specified storage. */
remove(final DatanodeInfo dn, final Block block)130   synchronized void remove(final DatanodeInfo dn, final Block block) {
131     final LightWeightHashSet<Block> v = node2blocks.get(dn);
132     if (v != null && v.remove(block)) {
133       numBlocks--;
134       if (v.isEmpty()) {
135         node2blocks.remove(dn);
136       }
137     }
138   }
139 
140   /** Print the contents to out. */
dump(final PrintWriter out)141   synchronized void dump(final PrintWriter out) {
142     final int size = node2blocks.values().size();
143     out.println("Metasave: Blocks " + numBlocks
144         + " waiting deletion from " + size + " datanodes.");
145     if (size == 0) {
146       return;
147     }
148 
149     for(Map.Entry<DatanodeInfo, LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
150       final LightWeightHashSet<Block> blocks = entry.getValue();
151       if (blocks.size() > 0) {
152         out.println(entry.getKey());
153         out.println(blocks);
154       }
155     }
156   }
157 
158   /** @return a list of the storage IDs. */
getDatanodes()159   synchronized List<DatanodeInfo> getDatanodes() {
160     return new ArrayList<DatanodeInfo>(node2blocks.keySet());
161   }
162 
163   /**
164    * @return the remianing pending time
165    */
166   @VisibleForTesting
getInvalidationDelay()167   long getInvalidationDelay() {
168     return pendingPeriodInMs - (Time.monotonicNow() - startupTime);
169   }
170 
invalidateWork(final DatanodeDescriptor dn)171   synchronized List<Block> invalidateWork(final DatanodeDescriptor dn) {
172     final long delay = getInvalidationDelay();
173     if (delay > 0) {
174       if (BlockManager.LOG.isDebugEnabled()) {
175         BlockManager.LOG
176             .debug("Block deletion is delayed during NameNode startup. "
177                        + "The deletion will start after " + delay + " ms.");
178       }
179       return null;
180     }
181     final LightWeightHashSet<Block> set = node2blocks.get(dn);
182     if (set == null) {
183       return null;
184     }
185 
186     // # blocks that can be sent in one message is limited
187     final int limit = blockInvalidateLimit;
188     final List<Block> toInvalidate = set.pollN(limit);
189 
190     // If we send everything in this message, remove this node entry
191     if (set.isEmpty()) {
192       remove(dn);
193     }
194 
195     dn.addBlocksToBeInvalidated(toInvalidate);
196     numBlocks -= toInvalidate.size();
197     return toInvalidate;
198   }
199 
clear()200   synchronized void clear() {
201     node2blocks.clear();
202     numBlocks = 0;
203   }
204 }
205