1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.namenode;
19 
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.io.PrintWriter;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.Socket;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Date;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.TreeSet;
35 
36 import org.apache.commons.io.IOUtils;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.classification.InterfaceAudience;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.fs.UnresolvedLinkException;
43 import org.apache.hadoop.hdfs.BlockReader;
44 import org.apache.hadoop.hdfs.BlockReaderFactory;
45 import org.apache.hadoop.hdfs.DFSClient;
46 import org.apache.hadoop.hdfs.DFSConfigKeys;
47 import org.apache.hadoop.hdfs.DFSUtil;
48 import org.apache.hadoop.hdfs.RemotePeerFactory;
49 import org.apache.hadoop.fs.StorageType;
50 import org.apache.hadoop.hdfs.net.Peer;
51 import org.apache.hadoop.hdfs.net.TcpPeerServer;
52 import org.apache.hadoop.hdfs.protocol.Block;
53 import org.apache.hadoop.hdfs.protocol.DatanodeID;
54 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
55 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
56 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
57 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
58 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
59 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
60 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
61 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
62 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
63 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
64 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
65 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
66 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
67 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
68 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
69 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
70 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
71 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
72 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
73 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
74 import org.apache.hadoop.net.NetUtils;
75 import org.apache.hadoop.net.NetworkTopology;
76 import org.apache.hadoop.net.NodeBase;
77 import org.apache.hadoop.security.AccessControlException;
78 import org.apache.hadoop.security.UserGroupInformation;
79 import org.apache.hadoop.security.token.Token;
80 import org.apache.hadoop.util.Time;
81 
82 import com.google.common.annotations.VisibleForTesting;
83 
84 /**
85  * This class provides rudimentary checking of DFS volumes for errors and
86  * sub-optimal conditions.
87  * <p>The tool scans all files and directories, starting from an indicated
88  *  root path. The following abnormal conditions are detected and handled:</p>
89  * <ul>
90  * <li>files with blocks that are completely missing from all datanodes.<br/>
91  * In this case the tool can perform one of the following actions:
92  *  <ul>
93  *      <li>none ({@link #FIXING_NONE})</li>
94  *      <li>move corrupted files to /lost+found directory on DFS
95  *      ({@link #FIXING_MOVE}). Remaining data blocks are saved as a
96  *      block chains, representing longest consecutive series of valid blocks.</li>
97  *      <li>delete corrupted files ({@link #FIXING_DELETE})</li>
98  *  </ul>
99  *  </li>
100  *  <li>detect files with under-replicated or over-replicated blocks</li>
101  *  </ul>
102  *  Additionally, the tool collects a detailed overall DFS statistics, and
103  *  optionally can print detailed statistics on block locations and replication
104  *  factors of each file.
105  */
106 @InterfaceAudience.Private
107 public class NamenodeFsck implements DataEncryptionKeyFactory {
108   public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
109 
110   // return string marking fsck status
111   public static final String CORRUPT_STATUS = "is CORRUPT";
112   public static final String HEALTHY_STATUS = "is HEALTHY";
113   public static final String DECOMMISSIONING_STATUS = "is DECOMMISSIONING";
114   public static final String DECOMMISSIONED_STATUS = "is DECOMMISSIONED";
115   public static final String NONEXISTENT_STATUS = "does not exist";
116   public static final String FAILURE_STATUS = "FAILED";
117 
118   private final NameNode namenode;
119   private final NetworkTopology networktopology;
120   private final int totalDatanodes;
121   private final InetAddress remoteAddress;
122 
123   private String lostFound = null;
124   private boolean lfInited = false;
125   private boolean lfInitedOk = false;
126   private boolean showFiles = false;
127   private boolean showOpenFiles = false;
128   private boolean showBlocks = false;
129   private boolean showLocations = false;
130   private boolean showRacks = false;
131   private boolean showStoragePolcies = false;
132   private boolean showCorruptFileBlocks = false;
133 
134   /**
135    * True if we encountered an internal error during FSCK, such as not being
136    * able to delete a corrupt file.
137    */
138   private boolean internalError = false;
139 
140   /**
141    * True if the user specified the -move option.
142    *
143    * Whe this option is in effect, we will copy salvaged blocks into the lost
144    * and found. */
145   private boolean doMove = false;
146 
147   /**
148    * True if the user specified the -delete option.
149    *
150    * Whe this option is in effect, we will delete corrupted files.
151    */
152   private boolean doDelete = false;
153 
154   String path = "/";
155 
156   private String blockIds = null;
157 
158   // We return back N files that are corrupt; the list of files returned is
159   // ordered by block id; to allow continuation support, pass in the last block
160   // # from previous call
161   private final String[] currentCookie = new String[] { null };
162 
163   private final Configuration conf;
164   private final PrintWriter out;
165   private List<String> snapshottableDirs = null;
166 
167   private final BlockPlacementPolicy bpPolicy;
168   private StoragePolicySummary storageTypeSummary = null;
169 
170   /**
171    * Filesystem checker.
172    * @param conf configuration (namenode config)
173    * @param namenode namenode that this fsck is going to use
174    * @param pmap key=value[] map passed to the http servlet as url parameters
175    * @param out output stream to write the fsck output
176    * @param totalDatanodes number of live datanodes
177    * @param remoteAddress source address of the fsck request
178    */
NamenodeFsck(Configuration conf, NameNode namenode, NetworkTopology networktopology, Map<String,String[]> pmap, PrintWriter out, int totalDatanodes, InetAddress remoteAddress)179   NamenodeFsck(Configuration conf, NameNode namenode,
180       NetworkTopology networktopology,
181       Map<String,String[]> pmap, PrintWriter out,
182       int totalDatanodes, InetAddress remoteAddress) {
183     this.conf = conf;
184     this.namenode = namenode;
185     this.networktopology = networktopology;
186     this.out = out;
187     this.totalDatanodes = totalDatanodes;
188     this.remoteAddress = remoteAddress;
189     this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null,
190         networktopology,
191         namenode.getNamesystem().getBlockManager().getDatanodeManager()
192         .getHost2DatanodeMap());
193 
194     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
195       String key = it.next();
196       if (key.equals("path")) { this.path = pmap.get("path")[0]; }
197       else if (key.equals("move")) { this.doMove = true; }
198       else if (key.equals("delete")) { this.doDelete = true; }
199       else if (key.equals("files")) { this.showFiles = true; }
200       else if (key.equals("blocks")) { this.showBlocks = true; }
201       else if (key.equals("locations")) { this.showLocations = true; }
202       else if (key.equals("racks")) { this.showRacks = true; }
203       else if (key.equals("storagepolicies")) { this.showStoragePolcies = true; }
204       else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
205       else if (key.equals("listcorruptfileblocks")) {
206         this.showCorruptFileBlocks = true;
207       } else if (key.equals("startblockafter")) {
208         this.currentCookie[0] = pmap.get("startblockafter")[0];
209       } else if (key.equals("includeSnapshots")) {
210         this.snapshottableDirs = new ArrayList<String>();
211       } else if (key.equals("blockId")) {
212         this.blockIds = pmap.get("blockId")[0];
213       }
214     }
215   }
216 
217   /**
218    * Check block information given a blockId number
219    *
220   */
blockIdCK(String blockId)221   public void blockIdCK(String blockId) {
222 
223     if(blockId == null) {
224       out.println("Please provide valid blockId!");
225       return;
226     }
227 
228     BlockManager bm = namenode.getNamesystem().getBlockManager();
229     try {
230       //get blockInfo
231       Block block = new Block(Block.getBlockId(blockId));
232       //find which file this block belongs to
233       BlockInfoContiguous blockInfo = bm.getStoredBlock(block);
234       if(blockInfo == null) {
235         out.println("Block "+ blockId +" " + NONEXISTENT_STATUS);
236         LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS);
237         return;
238       }
239       BlockCollection bc = bm.getBlockCollection(blockInfo);
240       INode iNode = (INode) bc;
241       NumberReplicas numberReplicas= bm.countNodes(block);
242       out.println("Block Id: " + blockId);
243       out.println("Block belongs to: "+iNode.getFullPathName());
244       out.println("No. of Expected Replica: " + bc.getBlockReplication());
245       out.println("No. of live Replica: " + numberReplicas.liveReplicas());
246       out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
247       out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes());
248       out.println("No. of decommission Replica: "
249           + numberReplicas.decommissionedReplicas());
250       out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas());
251       //record datanodes that have corrupted block replica
252       Collection<DatanodeDescriptor> corruptionRecord = null;
253       if (bm.getCorruptReplicas(block) != null) {
254         corruptionRecord = bm.getCorruptReplicas(block);
255       }
256 
257       //report block replicas status on datanodes
258       for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) {
259         DatanodeDescriptor dn = blockInfo.getDatanode(idx);
260         out.print("Block replica on datanode/rack: " + dn.getHostName() +
261             dn.getNetworkLocation() + " ");
262         if (corruptionRecord != null && corruptionRecord.contains(dn)) {
263           out.print(CORRUPT_STATUS+"\t ReasonCode: "+
264             bm.getCorruptReason(block,dn));
265         } else if (dn.isDecommissioned() ){
266           out.print(DECOMMISSIONED_STATUS);
267         } else if (dn.isDecommissionInProgress()) {
268           out.print(DECOMMISSIONING_STATUS);
269         } else {
270           out.print(HEALTHY_STATUS);
271         }
272         out.print("\n");
273       }
274     } catch (Exception e){
275       String errMsg = "Fsck on blockId '" + blockId;
276       LOG.warn(errMsg, e);
277       out.println(e.getMessage());
278       out.print("\n\n" + errMsg);
279       LOG.warn("Error in looking up block", e);
280     }
281   }
282 
283   /**
284    * Check files on DFS, starting from the indicated path.
285    */
fsck()286   public void fsck() {
287     final long startTime = Time.monotonicNow();
288     try {
289       if(blockIds != null) {
290         String[] blocks = blockIds.split(" ");
291         StringBuilder sb = new StringBuilder();
292         sb.append("FSCK started by " +
293             UserGroupInformation.getCurrentUser() + " from " +
294             remoteAddress + " at " + new Date());
295         out.println(sb.toString());
296         sb.append(" for blockIds: \n");
297         for (String blk: blocks) {
298           if(blk == null || !blk.contains(Block.BLOCK_FILE_PREFIX)) {
299             out.println("Incorrect blockId format: " + blk);
300             continue;
301           }
302           out.print("\n");
303           blockIdCK(blk);
304           sb.append(blk + "\n");
305         }
306         LOG.info(sb.toString());
307         namenode.getNamesystem().logFsckEvent("/", remoteAddress);
308         out.flush();
309         return;
310       }
311 
312       String msg = "FSCK started by " + UserGroupInformation.getCurrentUser()
313           + " from " + remoteAddress + " for path " + path + " at " + new Date();
314       LOG.info(msg);
315       out.println(msg);
316       namenode.getNamesystem().logFsckEvent(path, remoteAddress);
317 
318       if (snapshottableDirs != null) {
319         SnapshottableDirectoryStatus[] snapshotDirs = namenode.getRpcServer()
320             .getSnapshottableDirListing();
321         if (snapshotDirs != null) {
322           for (SnapshottableDirectoryStatus dir : snapshotDirs) {
323             snapshottableDirs.add(dir.getFullPath().toString());
324           }
325         }
326       }
327 
328       final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path);
329       if (file != null) {
330 
331         if (showCorruptFileBlocks) {
332           listCorruptFileBlocks();
333           return;
334         }
335 
336         if (this.showStoragePolcies) {
337           storageTypeSummary = new StoragePolicySummary(
338               namenode.getNamesystem().getBlockManager().getStoragePolicies());
339         }
340 
341         Result res = new Result(conf);
342 
343         check(path, file, res);
344 
345         out.println(res);
346         out.println(" Number of data-nodes:\t\t" + totalDatanodes);
347         out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
348 
349         if (this.showStoragePolcies) {
350           out.print(storageTypeSummary.toString());
351         }
352 
353         out.println("FSCK ended at " + new Date() + " in "
354             + (Time.monotonicNow() - startTime + " milliseconds"));
355 
356         // If there were internal errors during the fsck operation, we want to
357         // return FAILURE_STATUS, even if those errors were not immediately
358         // fatal.  Otherwise many unit tests will pass even when there are bugs.
359         if (internalError) {
360           throw new IOException("fsck encountered internal errors!");
361         }
362 
363         // DFSck client scans for the string HEALTHY/CORRUPT to check the status
364         // of file system and return appropriate code. Changing the output
365         // string might break testcases. Also note this must be the last line
366         // of the report.
367         if (res.isHealthy()) {
368           out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS);
369         } else {
370           out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS);
371         }
372 
373       } else {
374         out.print("\n\nPath '" + path + "' " + NONEXISTENT_STATUS);
375       }
376     } catch (Exception e) {
377       String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS;
378       LOG.warn(errMsg, e);
379       out.println("FSCK ended at " + new Date() + " in "
380           + (Time.monotonicNow() - startTime + " milliseconds"));
381       out.println(e.getMessage());
382       out.print("\n\n" + errMsg);
383     } finally {
384       out.close();
385     }
386   }
387 
listCorruptFileBlocks()388   private void listCorruptFileBlocks() throws IOException {
389     Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode.
390       getNamesystem().listCorruptFileBlocks(path, currentCookie);
391     int numCorruptFiles = corruptFiles.size();
392     String filler;
393     if (numCorruptFiles > 0) {
394       filler = Integer.toString(numCorruptFiles);
395     } else if (currentCookie[0].equals("0")) {
396       filler = "no";
397     } else {
398       filler = "no more";
399     }
400     out.println("Cookie:\t" + currentCookie[0]);
401     for (FSNamesystem.CorruptFileBlockInfo c : corruptFiles) {
402       out.println(c.toString());
403     }
404     out.println("\n\nThe filesystem under path '" + path + "' has " + filler
405         + " CORRUPT files");
406     out.println();
407   }
408 
409   @VisibleForTesting
check(String parent, HdfsFileStatus file, Result res)410   void check(String parent, HdfsFileStatus file, Result res) throws IOException {
411     String path = file.getFullName(parent);
412     boolean isOpen = false;
413 
414     if (file.isDir()) {
415       if (snapshottableDirs != null && snapshottableDirs.contains(path)) {
416         String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
417             + Path.SEPARATOR)
418             + HdfsConstants.DOT_SNAPSHOT_DIR;
419         HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(
420             snapshotPath);
421         check(snapshotPath, snapshotFileInfo, res);
422       }
423       byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
424       DirectoryListing thisListing;
425       if (showFiles) {
426         out.println(path + " <dir>");
427       }
428       res.totalDirs++;
429       do {
430         assert lastReturnedName != null;
431         thisListing = namenode.getRpcServer().getListing(
432             path, lastReturnedName, false);
433         if (thisListing == null) {
434           return;
435         }
436         HdfsFileStatus[] files = thisListing.getPartialListing();
437         for (int i = 0; i < files.length; i++) {
438           check(path, files[i], res);
439         }
440         lastReturnedName = thisListing.getLastName();
441       } while (thisListing.hasMore());
442       return;
443     }
444     if (file.isSymlink()) {
445       if (showFiles) {
446         out.println(path + " <symlink>");
447       }
448       res.totalSymlinks++;
449       return;
450     }
451     long fileLen = file.getLen();
452     // Get block locations without updating the file access time
453     // and without block access tokens
454     LocatedBlocks blocks = null;
455     FSNamesystem fsn = namenode.getNamesystem();
456     fsn.readLock();
457     try {
458       blocks = fsn.getBlockLocations(
459           fsn.getPermissionChecker(), path, 0, fileLen, false, false)
460           .blocks;
461     } catch (FileNotFoundException fnfe) {
462       blocks = null;
463     } finally {
464       fsn.readUnlock();
465     }
466     if (blocks == null) { // the file is deleted
467       return;
468     }
469     isOpen = blocks.isUnderConstruction();
470     if (isOpen && !showOpenFiles) {
471       // We collect these stats about open files to report with default options
472       res.totalOpenFilesSize += fileLen;
473       res.totalOpenFilesBlocks += blocks.locatedBlockCount();
474       res.totalOpenFiles++;
475       return;
476     }
477     res.totalFiles++;
478     res.totalSize += fileLen;
479     res.totalBlocks += blocks.locatedBlockCount();
480     if (showOpenFiles && isOpen) {
481       out.print(path + " " + fileLen + " bytes, " +
482         blocks.locatedBlockCount() + " block(s), OPENFORWRITE: ");
483     } else if (showFiles) {
484       out.print(path + " " + fileLen + " bytes, " +
485         blocks.locatedBlockCount() + " block(s): ");
486     } else {
487       out.print('.');
488     }
489     if (res.totalFiles % 100 == 0) { out.println(); out.flush(); }
490     int missing = 0;
491     int corrupt = 0;
492     long missize = 0;
493     int underReplicatedPerFile = 0;
494     int misReplicatedPerFile = 0;
495     StringBuilder report = new StringBuilder();
496     int i = 0;
497     for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
498       ExtendedBlock block = lBlk.getBlock();
499       boolean isCorrupt = lBlk.isCorrupt();
500       String blkName = block.toString();
501       DatanodeInfo[] locs = lBlk.getLocations();
502       NumberReplicas numberReplicas =
503           namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock());
504       int liveReplicas = numberReplicas.liveReplicas();
505       res.totalReplicas += liveReplicas;
506       short targetFileReplication = file.getReplication();
507       res.numExpectedReplicas += targetFileReplication;
508       if(liveReplicas < res.minReplication){
509         res.numUnderMinReplicatedBlocks++;
510       }
511       if (liveReplicas > targetFileReplication) {
512         res.excessiveReplicas += (liveReplicas - targetFileReplication);
513         res.numOverReplicatedBlocks += 1;
514       }
515       //keep track of storage tier counts
516       if (this.showStoragePolcies && lBlk.getStorageTypes() != null) {
517         StorageType[] storageTypes = lBlk.getStorageTypes();
518         storageTypeSummary.add(Arrays.copyOf(storageTypes, storageTypes.length),
519             fsn.getBlockManager().getStoragePolicy(file.getStoragePolicy()));
520       }
521       // Check if block is Corrupt
522       if (isCorrupt) {
523         corrupt++;
524         res.corruptBlocks++;
525         out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() +
526             " block " + block.getBlockName()+"\n");
527       }
528       if (liveReplicas >= res.minReplication)
529         res.numMinReplicatedBlocks++;
530       if (liveReplicas < targetFileReplication && liveReplicas > 0) {
531         res.missingReplicas += (targetFileReplication - liveReplicas);
532         res.numUnderReplicatedBlocks += 1;
533         underReplicatedPerFile++;
534         if (!showFiles) {
535           out.print("\n" + path + ": ");
536         }
537         out.println(" Under replicated " + block +
538                     ". Target Replicas is " +
539                     targetFileReplication + " but found " +
540                     liveReplicas + " replica(s).");
541       }
542 
543       // count mis replicated blocks
544       BlockPlacementStatus blockPlacementStatus = bpPolicy
545           .verifyBlockPlacement(path, lBlk, targetFileReplication);
546       if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
547         res.numMisReplicatedBlocks++;
548         misReplicatedPerFile++;
549         if (!showFiles) {
550           if(underReplicatedPerFile == 0)
551             out.println();
552           out.print(path + ": ");
553         }
554         out.println(" Replica placement policy is violated for " +
555                     block + ". " + blockPlacementStatus.getErrorDescription());
556       }
557       report.append(i + ". " + blkName + " len=" + block.getNumBytes());
558       if (liveReplicas == 0) {
559         report.append(" MISSING!");
560         res.addMissing(block.toString(), block.getNumBytes());
561         missing++;
562         missize += block.getNumBytes();
563       } else {
564         report.append(" repl=" + liveReplicas);
565         if (showLocations || showRacks) {
566           StringBuilder sb = new StringBuilder("[");
567           for (int j = 0; j < locs.length; j++) {
568             if (j > 0) { sb.append(", "); }
569             if (showRacks)
570               sb.append(NodeBase.getPath(locs[j]));
571             else
572               sb.append(locs[j]);
573           }
574           sb.append(']');
575           report.append(" " + sb.toString());
576         }
577       }
578       report.append('\n');
579       i++;
580     }
581     if ((missing > 0) || (corrupt > 0)) {
582       if (!showFiles && (missing > 0)) {
583         out.print("\n" + path + ": MISSING " + missing
584             + " blocks of total size " + missize + " B.");
585       }
586       res.corruptFiles++;
587       if (isOpen) {
588         LOG.info("Fsck: ignoring open file " + path);
589       } else {
590         if (doMove) copyBlocksToLostFound(parent, file, blocks);
591         if (doDelete) deleteCorruptedFile(path);
592       }
593     }
594     if (showFiles) {
595       if (missing > 0) {
596         out.print(" MISSING " + missing + " blocks of total size " + missize + " B\n");
597       }  else if (underReplicatedPerFile == 0 && misReplicatedPerFile == 0) {
598         out.print(" OK\n");
599       }
600       if (showBlocks) {
601         out.print(report.toString() + "\n");
602       }
603     }
604   }
605 
deleteCorruptedFile(String path)606   private void deleteCorruptedFile(String path) {
607     try {
608       namenode.getRpcServer().delete(path, true);
609       LOG.info("Fsck: deleted corrupt file " + path);
610     } catch (Exception e) {
611       LOG.error("Fsck: error deleting corrupted file " + path, e);
612       internalError = true;
613     }
614   }
615 
hdfsPathExists(String path)616   boolean hdfsPathExists(String path)
617       throws AccessControlException, UnresolvedLinkException, IOException {
618     try {
619       HdfsFileStatus hfs = namenode.getRpcServer().getFileInfo(path);
620       return (hfs != null);
621     } catch (FileNotFoundException e) {
622       return false;
623     }
624   }
625 
copyBlocksToLostFound(String parent, HdfsFileStatus file, LocatedBlocks blocks)626   private void copyBlocksToLostFound(String parent, HdfsFileStatus file,
627         LocatedBlocks blocks) throws IOException {
628     final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf);
629     final String fullName = file.getFullName(parent);
630     OutputStream fos = null;
631     try {
632       if (!lfInited) {
633         lostFoundInit(dfs);
634       }
635       if (!lfInitedOk) {
636         throw new IOException("failed to initialize lost+found");
637       }
638       String target = lostFound + fullName;
639       if (hdfsPathExists(target)) {
640         LOG.warn("Fsck: can't copy the remains of " + fullName + " to " +
641           "lost+found, because " + target + " already exists.");
642         return;
643       }
644       if (!namenode.getRpcServer().mkdirs(
645           target, file.getPermission(), true)) {
646         throw new IOException("failed to create directory " + target);
647       }
648       // create chains
649       int chain = 0;
650       boolean copyError = false;
651       for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
652         LocatedBlock lblock = lBlk;
653         DatanodeInfo[] locs = lblock.getLocations();
654         if (locs == null || locs.length == 0) {
655           if (fos != null) {
656             fos.flush();
657             fos.close();
658             fos = null;
659           }
660           continue;
661         }
662         if (fos == null) {
663           fos = dfs.create(target + "/" + chain, true);
664           chain++;
665         }
666 
667         // copy the block. It's a pity it's not abstracted from DFSInputStream ...
668         try {
669           copyBlock(dfs, lblock, fos);
670         } catch (Exception e) {
671           LOG.error("Fsck: could not copy block " + lblock.getBlock() +
672               " to " + target, e);
673           fos.flush();
674           fos.close();
675           fos = null;
676           internalError = true;
677           copyError = true;
678         }
679       }
680       if (copyError) {
681         LOG.warn("Fsck: there were errors copying the remains of the " +
682           "corrupted file " + fullName + " to /lost+found");
683       } else {
684         LOG.info("Fsck: copied the remains of the corrupted file " +
685           fullName + " to /lost+found");
686       }
687     } catch (Exception e) {
688       LOG.error("copyBlocksToLostFound: error processing " + fullName, e);
689       internalError = true;
690     } finally {
691       if (fos != null) fos.close();
692       dfs.close();
693     }
694   }
695 
696   /*
697    * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is
698    * bad. Both places should be refactored to provide a method to copy blocks
699    * around.
700    */
copyBlock(final DFSClient dfs, LocatedBlock lblock, OutputStream fos)701   private void copyBlock(final DFSClient dfs, LocatedBlock lblock,
702                          OutputStream fos) throws Exception {
703     int failures = 0;
704     InetSocketAddress targetAddr = null;
705     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
706     BlockReader blockReader = null;
707     ExtendedBlock block = lblock.getBlock();
708 
709     while (blockReader == null) {
710       DatanodeInfo chosenNode;
711 
712       try {
713         chosenNode = bestNode(dfs, lblock.getLocations(), deadNodes);
714         targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr());
715       }  catch (IOException ie) {
716         if (failures >= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) {
717           throw new IOException("Could not obtain block " + lblock, ie);
718         }
719         LOG.info("Could not obtain block from any node:  " + ie);
720         try {
721           Thread.sleep(10000);
722         }  catch (InterruptedException iex) {
723         }
724         deadNodes.clear();
725         failures++;
726         continue;
727       }
728       try {
729         String file = BlockReaderFactory.getFileName(targetAddr,
730             block.getBlockPoolId(), block.getBlockId());
731         blockReader = new BlockReaderFactory(dfs.getConf()).
732             setFileName(file).
733             setBlock(block).
734             setBlockToken(lblock.getBlockToken()).
735             setStartOffset(0).
736             setLength(-1).
737             setVerifyChecksum(true).
738             setClientName("fsck").
739             setDatanodeInfo(chosenNode).
740             setInetSocketAddress(targetAddr).
741             setCachingStrategy(CachingStrategy.newDropBehind()).
742             setClientCacheContext(dfs.getClientContext()).
743             setConfiguration(namenode.conf).
744             setRemotePeerFactory(new RemotePeerFactory() {
745               @Override
746               public Peer newConnectedPeer(InetSocketAddress addr,
747                   Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
748                   throws IOException {
749                 Peer peer = null;
750                 Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
751                 try {
752                   s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
753                   s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
754                   peer = TcpPeerServer.peerFromSocketAndKey(
755                         dfs.getSaslDataTransferClient(), s, NamenodeFsck.this,
756                         blockToken, datanodeId);
757                 } finally {
758                   if (peer == null) {
759                     IOUtils.closeQuietly(s);
760                   }
761                 }
762                 return peer;
763               }
764             }).
765             build();
766       }  catch (IOException ex) {
767         // Put chosen node into dead list, continue
768         LOG.info("Failed to connect to " + targetAddr + ":" + ex);
769         deadNodes.add(chosenNode);
770       }
771     }
772     byte[] buf = new byte[1024];
773     int cnt = 0;
774     boolean success = true;
775     long bytesRead = 0;
776     try {
777       while ((cnt = blockReader.read(buf, 0, buf.length)) > 0) {
778         fos.write(buf, 0, cnt);
779         bytesRead += cnt;
780       }
781       if ( bytesRead != block.getNumBytes() ) {
782         throw new IOException("Recorded block size is " + block.getNumBytes() +
783                               ", but datanode returned " +bytesRead+" bytes");
784       }
785     } catch (Exception e) {
786       LOG.error("Error reading block", e);
787       success = false;
788     } finally {
789       blockReader.close();
790     }
791     if (!success) {
792       throw new Exception("Could not copy block data for " + lblock.getBlock());
793     }
794   }
795 
796   @Override
newDataEncryptionKey()797   public DataEncryptionKey newDataEncryptionKey() throws IOException {
798     return namenode.getRpcServer().getDataEncryptionKey();
799   }
800 
801   /*
802    * XXX (ab) See comment above for copyBlock().
803    *
804    * Pick the best node from which to stream the data.
805    * That's the local one, if available.
806    */
bestNode(DFSClient dfs, DatanodeInfo[] nodes, TreeSet<DatanodeInfo> deadNodes)807   private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes,
808                                 TreeSet<DatanodeInfo> deadNodes) throws IOException {
809     if ((nodes == null) ||
810         (nodes.length - deadNodes.size() < 1)) {
811       throw new IOException("No live nodes contain current block");
812     }
813     DatanodeInfo chosenNode;
814     do {
815       chosenNode = nodes[DFSUtil.getRandom().nextInt(nodes.length)];
816     } while (deadNodes.contains(chosenNode));
817     return chosenNode;
818   }
819 
lostFoundInit(DFSClient dfs)820   private void lostFoundInit(DFSClient dfs) {
821     lfInited = true;
822     try {
823       String lfName = "/lost+found";
824 
825       final HdfsFileStatus lfStatus = dfs.getFileInfo(lfName);
826       if (lfStatus == null) { // not exists
827         lfInitedOk = dfs.mkdirs(lfName, null, true);
828         lostFound = lfName;
829       } else if (!lfStatus.isDir()) { // exists but not a directory
830         LOG.warn("Cannot use /lost+found : a regular file with this name exists.");
831         lfInitedOk = false;
832       }  else { // exists and is a directory
833         lostFound = lfName;
834         lfInitedOk = true;
835       }
836     }  catch (Exception e) {
837       e.printStackTrace();
838       lfInitedOk = false;
839     }
840     if (lostFound == null) {
841       LOG.warn("Cannot initialize /lost+found .");
842       lfInitedOk = false;
843       internalError = true;
844     }
845   }
846 
847   /**
848    * FsckResult of checking, plus overall DFS statistics.
849    */
850   @VisibleForTesting
851   static class Result {
852     final List<String> missingIds = new ArrayList<String>();
853     long missingSize = 0L;
854     long corruptFiles = 0L;
855     long corruptBlocks = 0L;
856     long excessiveReplicas = 0L;
857     long missingReplicas = 0L;
858     long numUnderMinReplicatedBlocks=0L;
859     long numOverReplicatedBlocks = 0L;
860     long numUnderReplicatedBlocks = 0L;
861     long numMisReplicatedBlocks = 0L;  // blocks that do not satisfy block placement policy
862     long numMinReplicatedBlocks = 0L;  // minimally replicatedblocks
863     long totalBlocks = 0L;
864     long numExpectedReplicas = 0L;
865     long totalOpenFilesBlocks = 0L;
866     long totalFiles = 0L;
867     long totalOpenFiles = 0L;
868     long totalDirs = 0L;
869     long totalSymlinks = 0L;
870     long totalSize = 0L;
871     long totalOpenFilesSize = 0L;
872     long totalReplicas = 0L;
873 
874     final short replication;
875     final int minReplication;
876 
Result(Configuration conf)877     Result(Configuration conf) {
878       this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
879                                             DFSConfigKeys.DFS_REPLICATION_DEFAULT);
880       this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
881                                             DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
882     }
883 
884     /**
885      * DFS is considered healthy if there are no missing blocks.
886      */
isHealthy()887     boolean isHealthy() {
888       return ((missingIds.size() == 0) && (corruptBlocks == 0));
889     }
890 
891     /** Add a missing block name, plus its size. */
addMissing(String id, long size)892     void addMissing(String id, long size) {
893       missingIds.add(id);
894       missingSize += size;
895     }
896 
897     /** Return the actual replication factor. */
getReplicationFactor()898     float getReplicationFactor() {
899       if (totalBlocks == 0)
900         return 0.0f;
901       return (float) (totalReplicas) / (float) totalBlocks;
902     }
903 
904     @Override
toString()905     public String toString() {
906       StringBuilder res = new StringBuilder();
907       res.append("Status: ").append((isHealthy() ? "HEALTHY" : "CORRUPT"))
908           .append("\n Total size:\t").append(totalSize).append(" B");
909       if (totalOpenFilesSize != 0) {
910         res.append(" (Total open files size: ").append(totalOpenFilesSize)
911             .append(" B)");
912       }
913       res.append("\n Total dirs:\t").append(totalDirs).append(
914           "\n Total files:\t").append(totalFiles);
915       res.append("\n Total symlinks:\t\t").append(totalSymlinks);
916       if (totalOpenFiles != 0) {
917         res.append(" (Files currently being written: ").append(totalOpenFiles)
918             .append(")");
919       }
920       res.append("\n Total blocks (validated):\t").append(totalBlocks);
921       if (totalBlocks > 0) {
922         res.append(" (avg. block size ").append((totalSize / totalBlocks))
923             .append(" B)");
924       }
925       if (totalOpenFilesBlocks != 0) {
926         res.append(" (Total open file blocks (not validated): ").append(
927             totalOpenFilesBlocks).append(")");
928       }
929       if (corruptFiles > 0 || numUnderMinReplicatedBlocks>0) {
930         res.append("\n  ********************************");
931         if(numUnderMinReplicatedBlocks>0){
932           res.append("\n  UNDER MIN REPL'D BLOCKS:\t").append(numUnderMinReplicatedBlocks);
933           if(totalBlocks>0){
934             res.append(" (").append(
935                 ((float) (numUnderMinReplicatedBlocks * 100) / (float) totalBlocks))
936                 .append(" %)");
937           }
938           res.append("\n  ").append(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY + ":\t")
939              .append(minReplication);
940         }
941         if(corruptFiles>0) {
942           res.append(
943               "\n  CORRUPT FILES:\t").append(corruptFiles);
944           if (missingSize > 0) {
945             res.append("\n  MISSING BLOCKS:\t").append(missingIds.size()).append(
946                 "\n  MISSING SIZE:\t\t").append(missingSize).append(" B");
947           }
948           if (corruptBlocks > 0) {
949             res.append("\n  CORRUPT BLOCKS: \t").append(corruptBlocks);
950           }
951         }
952         res.append("\n  ********************************");
953       }
954       res.append("\n Minimally replicated blocks:\t").append(
955           numMinReplicatedBlocks);
956       if (totalBlocks > 0) {
957         res.append(" (").append(
958             ((float) (numMinReplicatedBlocks * 100) / (float) totalBlocks))
959             .append(" %)");
960       }
961       res.append("\n Over-replicated blocks:\t")
962           .append(numOverReplicatedBlocks);
963       if (totalBlocks > 0) {
964         res.append(" (").append(
965             ((float) (numOverReplicatedBlocks * 100) / (float) totalBlocks))
966             .append(" %)");
967       }
968       res.append("\n Under-replicated blocks:\t").append(
969           numUnderReplicatedBlocks);
970       if (totalBlocks > 0) {
971         res.append(" (").append(
972             ((float) (numUnderReplicatedBlocks * 100) / (float) totalBlocks))
973             .append(" %)");
974       }
975       res.append("\n Mis-replicated blocks:\t\t")
976           .append(numMisReplicatedBlocks);
977       if (totalBlocks > 0) {
978         res.append(" (").append(
979             ((float) (numMisReplicatedBlocks * 100) / (float) totalBlocks))
980             .append(" %)");
981       }
982       res.append("\n Default replication factor:\t").append(replication)
983           .append("\n Average block replication:\t").append(
984               getReplicationFactor()).append("\n Corrupt blocks:\t\t").append(
985               corruptBlocks).append("\n Missing replicas:\t\t").append(
986               missingReplicas);
987       if (totalReplicas > 0) {
988         res.append(" (").append(
989             ((float) (missingReplicas * 100) / (float) numExpectedReplicas)).append(
990             " %)");
991       }
992       return res.toString();
993     }
994   }
995 }
996