1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.namenode; 19 20 import java.io.FileNotFoundException; 21 import java.io.IOException; 22 import java.io.OutputStream; 23 import java.io.PrintWriter; 24 import java.net.InetAddress; 25 import java.net.InetSocketAddress; 26 import java.net.Socket; 27 import java.util.ArrayList; 28 import java.util.Arrays; 29 import java.util.Collection; 30 import java.util.Date; 31 import java.util.Iterator; 32 import java.util.List; 33 import java.util.Map; 34 import java.util.TreeSet; 35 36 import org.apache.commons.io.IOUtils; 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 import org.apache.hadoop.classification.InterfaceAudience; 40 import org.apache.hadoop.conf.Configuration; 41 import org.apache.hadoop.fs.Path; 42 import org.apache.hadoop.fs.UnresolvedLinkException; 43 import org.apache.hadoop.hdfs.BlockReader; 44 import org.apache.hadoop.hdfs.BlockReaderFactory; 45 import org.apache.hadoop.hdfs.DFSClient; 46 import org.apache.hadoop.hdfs.DFSConfigKeys; 47 import org.apache.hadoop.hdfs.DFSUtil; 48 import org.apache.hadoop.hdfs.RemotePeerFactory; 49 import org.apache.hadoop.fs.StorageType; 50 import org.apache.hadoop.hdfs.net.Peer; 51 import org.apache.hadoop.hdfs.net.TcpPeerServer; 52 import org.apache.hadoop.hdfs.protocol.Block; 53 import org.apache.hadoop.hdfs.protocol.DatanodeID; 54 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 55 import org.apache.hadoop.hdfs.protocol.DirectoryListing; 56 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 57 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 58 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 59 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 60 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 61 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; 62 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; 63 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 64 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 65 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; 66 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; 67 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; 68 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; 69 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; 70 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; 71 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; 72 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; 73 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; 74 import org.apache.hadoop.net.NetUtils; 75 import org.apache.hadoop.net.NetworkTopology; 76 import org.apache.hadoop.net.NodeBase; 77 import org.apache.hadoop.security.AccessControlException; 78 import org.apache.hadoop.security.UserGroupInformation; 79 import org.apache.hadoop.security.token.Token; 80 import org.apache.hadoop.util.Time; 81 82 import com.google.common.annotations.VisibleForTesting; 83 84 /** 85 * This class provides rudimentary checking of DFS volumes for errors and 86 * sub-optimal conditions. 87 * <p>The tool scans all files and directories, starting from an indicated 88 * root path. The following abnormal conditions are detected and handled:</p> 89 * <ul> 90 * <li>files with blocks that are completely missing from all datanodes.<br/> 91 * In this case the tool can perform one of the following actions: 92 * <ul> 93 * <li>none ({@link #FIXING_NONE})</li> 94 * <li>move corrupted files to /lost+found directory on DFS 95 * ({@link #FIXING_MOVE}). Remaining data blocks are saved as a 96 * block chains, representing longest consecutive series of valid blocks.</li> 97 * <li>delete corrupted files ({@link #FIXING_DELETE})</li> 98 * </ul> 99 * </li> 100 * <li>detect files with under-replicated or over-replicated blocks</li> 101 * </ul> 102 * Additionally, the tool collects a detailed overall DFS statistics, and 103 * optionally can print detailed statistics on block locations and replication 104 * factors of each file. 105 */ 106 @InterfaceAudience.Private 107 public class NamenodeFsck implements DataEncryptionKeyFactory { 108 public static final Log LOG = LogFactory.getLog(NameNode.class.getName()); 109 110 // return string marking fsck status 111 public static final String CORRUPT_STATUS = "is CORRUPT"; 112 public static final String HEALTHY_STATUS = "is HEALTHY"; 113 public static final String DECOMMISSIONING_STATUS = "is DECOMMISSIONING"; 114 public static final String DECOMMISSIONED_STATUS = "is DECOMMISSIONED"; 115 public static final String NONEXISTENT_STATUS = "does not exist"; 116 public static final String FAILURE_STATUS = "FAILED"; 117 118 private final NameNode namenode; 119 private final NetworkTopology networktopology; 120 private final int totalDatanodes; 121 private final InetAddress remoteAddress; 122 123 private String lostFound = null; 124 private boolean lfInited = false; 125 private boolean lfInitedOk = false; 126 private boolean showFiles = false; 127 private boolean showOpenFiles = false; 128 private boolean showBlocks = false; 129 private boolean showLocations = false; 130 private boolean showRacks = false; 131 private boolean showStoragePolcies = false; 132 private boolean showCorruptFileBlocks = false; 133 134 /** 135 * True if we encountered an internal error during FSCK, such as not being 136 * able to delete a corrupt file. 137 */ 138 private boolean internalError = false; 139 140 /** 141 * True if the user specified the -move option. 142 * 143 * Whe this option is in effect, we will copy salvaged blocks into the lost 144 * and found. */ 145 private boolean doMove = false; 146 147 /** 148 * True if the user specified the -delete option. 149 * 150 * Whe this option is in effect, we will delete corrupted files. 151 */ 152 private boolean doDelete = false; 153 154 String path = "/"; 155 156 private String blockIds = null; 157 158 // We return back N files that are corrupt; the list of files returned is 159 // ordered by block id; to allow continuation support, pass in the last block 160 // # from previous call 161 private final String[] currentCookie = new String[] { null }; 162 163 private final Configuration conf; 164 private final PrintWriter out; 165 private List<String> snapshottableDirs = null; 166 167 private final BlockPlacementPolicy bpPolicy; 168 private StoragePolicySummary storageTypeSummary = null; 169 170 /** 171 * Filesystem checker. 172 * @param conf configuration (namenode config) 173 * @param namenode namenode that this fsck is going to use 174 * @param pmap key=value[] map passed to the http servlet as url parameters 175 * @param out output stream to write the fsck output 176 * @param totalDatanodes number of live datanodes 177 * @param remoteAddress source address of the fsck request 178 */ NamenodeFsck(Configuration conf, NameNode namenode, NetworkTopology networktopology, Map<String,String[]> pmap, PrintWriter out, int totalDatanodes, InetAddress remoteAddress)179 NamenodeFsck(Configuration conf, NameNode namenode, 180 NetworkTopology networktopology, 181 Map<String,String[]> pmap, PrintWriter out, 182 int totalDatanodes, InetAddress remoteAddress) { 183 this.conf = conf; 184 this.namenode = namenode; 185 this.networktopology = networktopology; 186 this.out = out; 187 this.totalDatanodes = totalDatanodes; 188 this.remoteAddress = remoteAddress; 189 this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, 190 networktopology, 191 namenode.getNamesystem().getBlockManager().getDatanodeManager() 192 .getHost2DatanodeMap()); 193 194 for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) { 195 String key = it.next(); 196 if (key.equals("path")) { this.path = pmap.get("path")[0]; } 197 else if (key.equals("move")) { this.doMove = true; } 198 else if (key.equals("delete")) { this.doDelete = true; } 199 else if (key.equals("files")) { this.showFiles = true; } 200 else if (key.equals("blocks")) { this.showBlocks = true; } 201 else if (key.equals("locations")) { this.showLocations = true; } 202 else if (key.equals("racks")) { this.showRacks = true; } 203 else if (key.equals("storagepolicies")) { this.showStoragePolcies = true; } 204 else if (key.equals("openforwrite")) {this.showOpenFiles = true; } 205 else if (key.equals("listcorruptfileblocks")) { 206 this.showCorruptFileBlocks = true; 207 } else if (key.equals("startblockafter")) { 208 this.currentCookie[0] = pmap.get("startblockafter")[0]; 209 } else if (key.equals("includeSnapshots")) { 210 this.snapshottableDirs = new ArrayList<String>(); 211 } else if (key.equals("blockId")) { 212 this.blockIds = pmap.get("blockId")[0]; 213 } 214 } 215 } 216 217 /** 218 * Check block information given a blockId number 219 * 220 */ blockIdCK(String blockId)221 public void blockIdCK(String blockId) { 222 223 if(blockId == null) { 224 out.println("Please provide valid blockId!"); 225 return; 226 } 227 228 BlockManager bm = namenode.getNamesystem().getBlockManager(); 229 try { 230 //get blockInfo 231 Block block = new Block(Block.getBlockId(blockId)); 232 //find which file this block belongs to 233 BlockInfoContiguous blockInfo = bm.getStoredBlock(block); 234 if(blockInfo == null) { 235 out.println("Block "+ blockId +" " + NONEXISTENT_STATUS); 236 LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS); 237 return; 238 } 239 BlockCollection bc = bm.getBlockCollection(blockInfo); 240 INode iNode = (INode) bc; 241 NumberReplicas numberReplicas= bm.countNodes(block); 242 out.println("Block Id: " + blockId); 243 out.println("Block belongs to: "+iNode.getFullPathName()); 244 out.println("No. of Expected Replica: " + bc.getBlockReplication()); 245 out.println("No. of live Replica: " + numberReplicas.liveReplicas()); 246 out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); 247 out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes()); 248 out.println("No. of decommission Replica: " 249 + numberReplicas.decommissionedReplicas()); 250 out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas()); 251 //record datanodes that have corrupted block replica 252 Collection<DatanodeDescriptor> corruptionRecord = null; 253 if (bm.getCorruptReplicas(block) != null) { 254 corruptionRecord = bm.getCorruptReplicas(block); 255 } 256 257 //report block replicas status on datanodes 258 for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) { 259 DatanodeDescriptor dn = blockInfo.getDatanode(idx); 260 out.print("Block replica on datanode/rack: " + dn.getHostName() + 261 dn.getNetworkLocation() + " "); 262 if (corruptionRecord != null && corruptionRecord.contains(dn)) { 263 out.print(CORRUPT_STATUS+"\t ReasonCode: "+ 264 bm.getCorruptReason(block,dn)); 265 } else if (dn.isDecommissioned() ){ 266 out.print(DECOMMISSIONED_STATUS); 267 } else if (dn.isDecommissionInProgress()) { 268 out.print(DECOMMISSIONING_STATUS); 269 } else { 270 out.print(HEALTHY_STATUS); 271 } 272 out.print("\n"); 273 } 274 } catch (Exception e){ 275 String errMsg = "Fsck on blockId '" + blockId; 276 LOG.warn(errMsg, e); 277 out.println(e.getMessage()); 278 out.print("\n\n" + errMsg); 279 LOG.warn("Error in looking up block", e); 280 } 281 } 282 283 /** 284 * Check files on DFS, starting from the indicated path. 285 */ fsck()286 public void fsck() { 287 final long startTime = Time.monotonicNow(); 288 try { 289 if(blockIds != null) { 290 String[] blocks = blockIds.split(" "); 291 StringBuilder sb = new StringBuilder(); 292 sb.append("FSCK started by " + 293 UserGroupInformation.getCurrentUser() + " from " + 294 remoteAddress + " at " + new Date()); 295 out.println(sb.toString()); 296 sb.append(" for blockIds: \n"); 297 for (String blk: blocks) { 298 if(blk == null || !blk.contains(Block.BLOCK_FILE_PREFIX)) { 299 out.println("Incorrect blockId format: " + blk); 300 continue; 301 } 302 out.print("\n"); 303 blockIdCK(blk); 304 sb.append(blk + "\n"); 305 } 306 LOG.info(sb.toString()); 307 namenode.getNamesystem().logFsckEvent("/", remoteAddress); 308 out.flush(); 309 return; 310 } 311 312 String msg = "FSCK started by " + UserGroupInformation.getCurrentUser() 313 + " from " + remoteAddress + " for path " + path + " at " + new Date(); 314 LOG.info(msg); 315 out.println(msg); 316 namenode.getNamesystem().logFsckEvent(path, remoteAddress); 317 318 if (snapshottableDirs != null) { 319 SnapshottableDirectoryStatus[] snapshotDirs = namenode.getRpcServer() 320 .getSnapshottableDirListing(); 321 if (snapshotDirs != null) { 322 for (SnapshottableDirectoryStatus dir : snapshotDirs) { 323 snapshottableDirs.add(dir.getFullPath().toString()); 324 } 325 } 326 } 327 328 final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path); 329 if (file != null) { 330 331 if (showCorruptFileBlocks) { 332 listCorruptFileBlocks(); 333 return; 334 } 335 336 if (this.showStoragePolcies) { 337 storageTypeSummary = new StoragePolicySummary( 338 namenode.getNamesystem().getBlockManager().getStoragePolicies()); 339 } 340 341 Result res = new Result(conf); 342 343 check(path, file, res); 344 345 out.println(res); 346 out.println(" Number of data-nodes:\t\t" + totalDatanodes); 347 out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks()); 348 349 if (this.showStoragePolcies) { 350 out.print(storageTypeSummary.toString()); 351 } 352 353 out.println("FSCK ended at " + new Date() + " in " 354 + (Time.monotonicNow() - startTime + " milliseconds")); 355 356 // If there were internal errors during the fsck operation, we want to 357 // return FAILURE_STATUS, even if those errors were not immediately 358 // fatal. Otherwise many unit tests will pass even when there are bugs. 359 if (internalError) { 360 throw new IOException("fsck encountered internal errors!"); 361 } 362 363 // DFSck client scans for the string HEALTHY/CORRUPT to check the status 364 // of file system and return appropriate code. Changing the output 365 // string might break testcases. Also note this must be the last line 366 // of the report. 367 if (res.isHealthy()) { 368 out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS); 369 } else { 370 out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS); 371 } 372 373 } else { 374 out.print("\n\nPath '" + path + "' " + NONEXISTENT_STATUS); 375 } 376 } catch (Exception e) { 377 String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS; 378 LOG.warn(errMsg, e); 379 out.println("FSCK ended at " + new Date() + " in " 380 + (Time.monotonicNow() - startTime + " milliseconds")); 381 out.println(e.getMessage()); 382 out.print("\n\n" + errMsg); 383 } finally { 384 out.close(); 385 } 386 } 387 listCorruptFileBlocks()388 private void listCorruptFileBlocks() throws IOException { 389 Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode. 390 getNamesystem().listCorruptFileBlocks(path, currentCookie); 391 int numCorruptFiles = corruptFiles.size(); 392 String filler; 393 if (numCorruptFiles > 0) { 394 filler = Integer.toString(numCorruptFiles); 395 } else if (currentCookie[0].equals("0")) { 396 filler = "no"; 397 } else { 398 filler = "no more"; 399 } 400 out.println("Cookie:\t" + currentCookie[0]); 401 for (FSNamesystem.CorruptFileBlockInfo c : corruptFiles) { 402 out.println(c.toString()); 403 } 404 out.println("\n\nThe filesystem under path '" + path + "' has " + filler 405 + " CORRUPT files"); 406 out.println(); 407 } 408 409 @VisibleForTesting check(String parent, HdfsFileStatus file, Result res)410 void check(String parent, HdfsFileStatus file, Result res) throws IOException { 411 String path = file.getFullName(parent); 412 boolean isOpen = false; 413 414 if (file.isDir()) { 415 if (snapshottableDirs != null && snapshottableDirs.contains(path)) { 416 String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path 417 + Path.SEPARATOR) 418 + HdfsConstants.DOT_SNAPSHOT_DIR; 419 HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo( 420 snapshotPath); 421 check(snapshotPath, snapshotFileInfo, res); 422 } 423 byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME; 424 DirectoryListing thisListing; 425 if (showFiles) { 426 out.println(path + " <dir>"); 427 } 428 res.totalDirs++; 429 do { 430 assert lastReturnedName != null; 431 thisListing = namenode.getRpcServer().getListing( 432 path, lastReturnedName, false); 433 if (thisListing == null) { 434 return; 435 } 436 HdfsFileStatus[] files = thisListing.getPartialListing(); 437 for (int i = 0; i < files.length; i++) { 438 check(path, files[i], res); 439 } 440 lastReturnedName = thisListing.getLastName(); 441 } while (thisListing.hasMore()); 442 return; 443 } 444 if (file.isSymlink()) { 445 if (showFiles) { 446 out.println(path + " <symlink>"); 447 } 448 res.totalSymlinks++; 449 return; 450 } 451 long fileLen = file.getLen(); 452 // Get block locations without updating the file access time 453 // and without block access tokens 454 LocatedBlocks blocks = null; 455 FSNamesystem fsn = namenode.getNamesystem(); 456 fsn.readLock(); 457 try { 458 blocks = fsn.getBlockLocations( 459 fsn.getPermissionChecker(), path, 0, fileLen, false, false) 460 .blocks; 461 } catch (FileNotFoundException fnfe) { 462 blocks = null; 463 } finally { 464 fsn.readUnlock(); 465 } 466 if (blocks == null) { // the file is deleted 467 return; 468 } 469 isOpen = blocks.isUnderConstruction(); 470 if (isOpen && !showOpenFiles) { 471 // We collect these stats about open files to report with default options 472 res.totalOpenFilesSize += fileLen; 473 res.totalOpenFilesBlocks += blocks.locatedBlockCount(); 474 res.totalOpenFiles++; 475 return; 476 } 477 res.totalFiles++; 478 res.totalSize += fileLen; 479 res.totalBlocks += blocks.locatedBlockCount(); 480 if (showOpenFiles && isOpen) { 481 out.print(path + " " + fileLen + " bytes, " + 482 blocks.locatedBlockCount() + " block(s), OPENFORWRITE: "); 483 } else if (showFiles) { 484 out.print(path + " " + fileLen + " bytes, " + 485 blocks.locatedBlockCount() + " block(s): "); 486 } else { 487 out.print('.'); 488 } 489 if (res.totalFiles % 100 == 0) { out.println(); out.flush(); } 490 int missing = 0; 491 int corrupt = 0; 492 long missize = 0; 493 int underReplicatedPerFile = 0; 494 int misReplicatedPerFile = 0; 495 StringBuilder report = new StringBuilder(); 496 int i = 0; 497 for (LocatedBlock lBlk : blocks.getLocatedBlocks()) { 498 ExtendedBlock block = lBlk.getBlock(); 499 boolean isCorrupt = lBlk.isCorrupt(); 500 String blkName = block.toString(); 501 DatanodeInfo[] locs = lBlk.getLocations(); 502 NumberReplicas numberReplicas = 503 namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock()); 504 int liveReplicas = numberReplicas.liveReplicas(); 505 res.totalReplicas += liveReplicas; 506 short targetFileReplication = file.getReplication(); 507 res.numExpectedReplicas += targetFileReplication; 508 if(liveReplicas < res.minReplication){ 509 res.numUnderMinReplicatedBlocks++; 510 } 511 if (liveReplicas > targetFileReplication) { 512 res.excessiveReplicas += (liveReplicas - targetFileReplication); 513 res.numOverReplicatedBlocks += 1; 514 } 515 //keep track of storage tier counts 516 if (this.showStoragePolcies && lBlk.getStorageTypes() != null) { 517 StorageType[] storageTypes = lBlk.getStorageTypes(); 518 storageTypeSummary.add(Arrays.copyOf(storageTypes, storageTypes.length), 519 fsn.getBlockManager().getStoragePolicy(file.getStoragePolicy())); 520 } 521 // Check if block is Corrupt 522 if (isCorrupt) { 523 corrupt++; 524 res.corruptBlocks++; 525 out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() + 526 " block " + block.getBlockName()+"\n"); 527 } 528 if (liveReplicas >= res.minReplication) 529 res.numMinReplicatedBlocks++; 530 if (liveReplicas < targetFileReplication && liveReplicas > 0) { 531 res.missingReplicas += (targetFileReplication - liveReplicas); 532 res.numUnderReplicatedBlocks += 1; 533 underReplicatedPerFile++; 534 if (!showFiles) { 535 out.print("\n" + path + ": "); 536 } 537 out.println(" Under replicated " + block + 538 ". Target Replicas is " + 539 targetFileReplication + " but found " + 540 liveReplicas + " replica(s)."); 541 } 542 543 // count mis replicated blocks 544 BlockPlacementStatus blockPlacementStatus = bpPolicy 545 .verifyBlockPlacement(path, lBlk, targetFileReplication); 546 if (!blockPlacementStatus.isPlacementPolicySatisfied()) { 547 res.numMisReplicatedBlocks++; 548 misReplicatedPerFile++; 549 if (!showFiles) { 550 if(underReplicatedPerFile == 0) 551 out.println(); 552 out.print(path + ": "); 553 } 554 out.println(" Replica placement policy is violated for " + 555 block + ". " + blockPlacementStatus.getErrorDescription()); 556 } 557 report.append(i + ". " + blkName + " len=" + block.getNumBytes()); 558 if (liveReplicas == 0) { 559 report.append(" MISSING!"); 560 res.addMissing(block.toString(), block.getNumBytes()); 561 missing++; 562 missize += block.getNumBytes(); 563 } else { 564 report.append(" repl=" + liveReplicas); 565 if (showLocations || showRacks) { 566 StringBuilder sb = new StringBuilder("["); 567 for (int j = 0; j < locs.length; j++) { 568 if (j > 0) { sb.append(", "); } 569 if (showRacks) 570 sb.append(NodeBase.getPath(locs[j])); 571 else 572 sb.append(locs[j]); 573 } 574 sb.append(']'); 575 report.append(" " + sb.toString()); 576 } 577 } 578 report.append('\n'); 579 i++; 580 } 581 if ((missing > 0) || (corrupt > 0)) { 582 if (!showFiles && (missing > 0)) { 583 out.print("\n" + path + ": MISSING " + missing 584 + " blocks of total size " + missize + " B."); 585 } 586 res.corruptFiles++; 587 if (isOpen) { 588 LOG.info("Fsck: ignoring open file " + path); 589 } else { 590 if (doMove) copyBlocksToLostFound(parent, file, blocks); 591 if (doDelete) deleteCorruptedFile(path); 592 } 593 } 594 if (showFiles) { 595 if (missing > 0) { 596 out.print(" MISSING " + missing + " blocks of total size " + missize + " B\n"); 597 } else if (underReplicatedPerFile == 0 && misReplicatedPerFile == 0) { 598 out.print(" OK\n"); 599 } 600 if (showBlocks) { 601 out.print(report.toString() + "\n"); 602 } 603 } 604 } 605 deleteCorruptedFile(String path)606 private void deleteCorruptedFile(String path) { 607 try { 608 namenode.getRpcServer().delete(path, true); 609 LOG.info("Fsck: deleted corrupt file " + path); 610 } catch (Exception e) { 611 LOG.error("Fsck: error deleting corrupted file " + path, e); 612 internalError = true; 613 } 614 } 615 hdfsPathExists(String path)616 boolean hdfsPathExists(String path) 617 throws AccessControlException, UnresolvedLinkException, IOException { 618 try { 619 HdfsFileStatus hfs = namenode.getRpcServer().getFileInfo(path); 620 return (hfs != null); 621 } catch (FileNotFoundException e) { 622 return false; 623 } 624 } 625 copyBlocksToLostFound(String parent, HdfsFileStatus file, LocatedBlocks blocks)626 private void copyBlocksToLostFound(String parent, HdfsFileStatus file, 627 LocatedBlocks blocks) throws IOException { 628 final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf); 629 final String fullName = file.getFullName(parent); 630 OutputStream fos = null; 631 try { 632 if (!lfInited) { 633 lostFoundInit(dfs); 634 } 635 if (!lfInitedOk) { 636 throw new IOException("failed to initialize lost+found"); 637 } 638 String target = lostFound + fullName; 639 if (hdfsPathExists(target)) { 640 LOG.warn("Fsck: can't copy the remains of " + fullName + " to " + 641 "lost+found, because " + target + " already exists."); 642 return; 643 } 644 if (!namenode.getRpcServer().mkdirs( 645 target, file.getPermission(), true)) { 646 throw new IOException("failed to create directory " + target); 647 } 648 // create chains 649 int chain = 0; 650 boolean copyError = false; 651 for (LocatedBlock lBlk : blocks.getLocatedBlocks()) { 652 LocatedBlock lblock = lBlk; 653 DatanodeInfo[] locs = lblock.getLocations(); 654 if (locs == null || locs.length == 0) { 655 if (fos != null) { 656 fos.flush(); 657 fos.close(); 658 fos = null; 659 } 660 continue; 661 } 662 if (fos == null) { 663 fos = dfs.create(target + "/" + chain, true); 664 chain++; 665 } 666 667 // copy the block. It's a pity it's not abstracted from DFSInputStream ... 668 try { 669 copyBlock(dfs, lblock, fos); 670 } catch (Exception e) { 671 LOG.error("Fsck: could not copy block " + lblock.getBlock() + 672 " to " + target, e); 673 fos.flush(); 674 fos.close(); 675 fos = null; 676 internalError = true; 677 copyError = true; 678 } 679 } 680 if (copyError) { 681 LOG.warn("Fsck: there were errors copying the remains of the " + 682 "corrupted file " + fullName + " to /lost+found"); 683 } else { 684 LOG.info("Fsck: copied the remains of the corrupted file " + 685 fullName + " to /lost+found"); 686 } 687 } catch (Exception e) { 688 LOG.error("copyBlocksToLostFound: error processing " + fullName, e); 689 internalError = true; 690 } finally { 691 if (fos != null) fos.close(); 692 dfs.close(); 693 } 694 } 695 696 /* 697 * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is 698 * bad. Both places should be refactored to provide a method to copy blocks 699 * around. 700 */ copyBlock(final DFSClient dfs, LocatedBlock lblock, OutputStream fos)701 private void copyBlock(final DFSClient dfs, LocatedBlock lblock, 702 OutputStream fos) throws Exception { 703 int failures = 0; 704 InetSocketAddress targetAddr = null; 705 TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>(); 706 BlockReader blockReader = null; 707 ExtendedBlock block = lblock.getBlock(); 708 709 while (blockReader == null) { 710 DatanodeInfo chosenNode; 711 712 try { 713 chosenNode = bestNode(dfs, lblock.getLocations(), deadNodes); 714 targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr()); 715 } catch (IOException ie) { 716 if (failures >= DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT) { 717 throw new IOException("Could not obtain block " + lblock, ie); 718 } 719 LOG.info("Could not obtain block from any node: " + ie); 720 try { 721 Thread.sleep(10000); 722 } catch (InterruptedException iex) { 723 } 724 deadNodes.clear(); 725 failures++; 726 continue; 727 } 728 try { 729 String file = BlockReaderFactory.getFileName(targetAddr, 730 block.getBlockPoolId(), block.getBlockId()); 731 blockReader = new BlockReaderFactory(dfs.getConf()). 732 setFileName(file). 733 setBlock(block). 734 setBlockToken(lblock.getBlockToken()). 735 setStartOffset(0). 736 setLength(-1). 737 setVerifyChecksum(true). 738 setClientName("fsck"). 739 setDatanodeInfo(chosenNode). 740 setInetSocketAddress(targetAddr). 741 setCachingStrategy(CachingStrategy.newDropBehind()). 742 setClientCacheContext(dfs.getClientContext()). 743 setConfiguration(namenode.conf). 744 setRemotePeerFactory(new RemotePeerFactory() { 745 @Override 746 public Peer newConnectedPeer(InetSocketAddress addr, 747 Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) 748 throws IOException { 749 Peer peer = null; 750 Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); 751 try { 752 s.connect(addr, HdfsServerConstants.READ_TIMEOUT); 753 s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); 754 peer = TcpPeerServer.peerFromSocketAndKey( 755 dfs.getSaslDataTransferClient(), s, NamenodeFsck.this, 756 blockToken, datanodeId); 757 } finally { 758 if (peer == null) { 759 IOUtils.closeQuietly(s); 760 } 761 } 762 return peer; 763 } 764 }). 765 build(); 766 } catch (IOException ex) { 767 // Put chosen node into dead list, continue 768 LOG.info("Failed to connect to " + targetAddr + ":" + ex); 769 deadNodes.add(chosenNode); 770 } 771 } 772 byte[] buf = new byte[1024]; 773 int cnt = 0; 774 boolean success = true; 775 long bytesRead = 0; 776 try { 777 while ((cnt = blockReader.read(buf, 0, buf.length)) > 0) { 778 fos.write(buf, 0, cnt); 779 bytesRead += cnt; 780 } 781 if ( bytesRead != block.getNumBytes() ) { 782 throw new IOException("Recorded block size is " + block.getNumBytes() + 783 ", but datanode returned " +bytesRead+" bytes"); 784 } 785 } catch (Exception e) { 786 LOG.error("Error reading block", e); 787 success = false; 788 } finally { 789 blockReader.close(); 790 } 791 if (!success) { 792 throw new Exception("Could not copy block data for " + lblock.getBlock()); 793 } 794 } 795 796 @Override newDataEncryptionKey()797 public DataEncryptionKey newDataEncryptionKey() throws IOException { 798 return namenode.getRpcServer().getDataEncryptionKey(); 799 } 800 801 /* 802 * XXX (ab) See comment above for copyBlock(). 803 * 804 * Pick the best node from which to stream the data. 805 * That's the local one, if available. 806 */ bestNode(DFSClient dfs, DatanodeInfo[] nodes, TreeSet<DatanodeInfo> deadNodes)807 private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes, 808 TreeSet<DatanodeInfo> deadNodes) throws IOException { 809 if ((nodes == null) || 810 (nodes.length - deadNodes.size() < 1)) { 811 throw new IOException("No live nodes contain current block"); 812 } 813 DatanodeInfo chosenNode; 814 do { 815 chosenNode = nodes[DFSUtil.getRandom().nextInt(nodes.length)]; 816 } while (deadNodes.contains(chosenNode)); 817 return chosenNode; 818 } 819 lostFoundInit(DFSClient dfs)820 private void lostFoundInit(DFSClient dfs) { 821 lfInited = true; 822 try { 823 String lfName = "/lost+found"; 824 825 final HdfsFileStatus lfStatus = dfs.getFileInfo(lfName); 826 if (lfStatus == null) { // not exists 827 lfInitedOk = dfs.mkdirs(lfName, null, true); 828 lostFound = lfName; 829 } else if (!lfStatus.isDir()) { // exists but not a directory 830 LOG.warn("Cannot use /lost+found : a regular file with this name exists."); 831 lfInitedOk = false; 832 } else { // exists and is a directory 833 lostFound = lfName; 834 lfInitedOk = true; 835 } 836 } catch (Exception e) { 837 e.printStackTrace(); 838 lfInitedOk = false; 839 } 840 if (lostFound == null) { 841 LOG.warn("Cannot initialize /lost+found ."); 842 lfInitedOk = false; 843 internalError = true; 844 } 845 } 846 847 /** 848 * FsckResult of checking, plus overall DFS statistics. 849 */ 850 @VisibleForTesting 851 static class Result { 852 final List<String> missingIds = new ArrayList<String>(); 853 long missingSize = 0L; 854 long corruptFiles = 0L; 855 long corruptBlocks = 0L; 856 long excessiveReplicas = 0L; 857 long missingReplicas = 0L; 858 long numUnderMinReplicatedBlocks=0L; 859 long numOverReplicatedBlocks = 0L; 860 long numUnderReplicatedBlocks = 0L; 861 long numMisReplicatedBlocks = 0L; // blocks that do not satisfy block placement policy 862 long numMinReplicatedBlocks = 0L; // minimally replicatedblocks 863 long totalBlocks = 0L; 864 long numExpectedReplicas = 0L; 865 long totalOpenFilesBlocks = 0L; 866 long totalFiles = 0L; 867 long totalOpenFiles = 0L; 868 long totalDirs = 0L; 869 long totalSymlinks = 0L; 870 long totalSize = 0L; 871 long totalOpenFilesSize = 0L; 872 long totalReplicas = 0L; 873 874 final short replication; 875 final int minReplication; 876 Result(Configuration conf)877 Result(Configuration conf) { 878 this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 879 DFSConfigKeys.DFS_REPLICATION_DEFAULT); 880 this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 881 DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); 882 } 883 884 /** 885 * DFS is considered healthy if there are no missing blocks. 886 */ isHealthy()887 boolean isHealthy() { 888 return ((missingIds.size() == 0) && (corruptBlocks == 0)); 889 } 890 891 /** Add a missing block name, plus its size. */ addMissing(String id, long size)892 void addMissing(String id, long size) { 893 missingIds.add(id); 894 missingSize += size; 895 } 896 897 /** Return the actual replication factor. */ getReplicationFactor()898 float getReplicationFactor() { 899 if (totalBlocks == 0) 900 return 0.0f; 901 return (float) (totalReplicas) / (float) totalBlocks; 902 } 903 904 @Override toString()905 public String toString() { 906 StringBuilder res = new StringBuilder(); 907 res.append("Status: ").append((isHealthy() ? "HEALTHY" : "CORRUPT")) 908 .append("\n Total size:\t").append(totalSize).append(" B"); 909 if (totalOpenFilesSize != 0) { 910 res.append(" (Total open files size: ").append(totalOpenFilesSize) 911 .append(" B)"); 912 } 913 res.append("\n Total dirs:\t").append(totalDirs).append( 914 "\n Total files:\t").append(totalFiles); 915 res.append("\n Total symlinks:\t\t").append(totalSymlinks); 916 if (totalOpenFiles != 0) { 917 res.append(" (Files currently being written: ").append(totalOpenFiles) 918 .append(")"); 919 } 920 res.append("\n Total blocks (validated):\t").append(totalBlocks); 921 if (totalBlocks > 0) { 922 res.append(" (avg. block size ").append((totalSize / totalBlocks)) 923 .append(" B)"); 924 } 925 if (totalOpenFilesBlocks != 0) { 926 res.append(" (Total open file blocks (not validated): ").append( 927 totalOpenFilesBlocks).append(")"); 928 } 929 if (corruptFiles > 0 || numUnderMinReplicatedBlocks>0) { 930 res.append("\n ********************************"); 931 if(numUnderMinReplicatedBlocks>0){ 932 res.append("\n UNDER MIN REPL'D BLOCKS:\t").append(numUnderMinReplicatedBlocks); 933 if(totalBlocks>0){ 934 res.append(" (").append( 935 ((float) (numUnderMinReplicatedBlocks * 100) / (float) totalBlocks)) 936 .append(" %)"); 937 } 938 res.append("\n ").append(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY + ":\t") 939 .append(minReplication); 940 } 941 if(corruptFiles>0) { 942 res.append( 943 "\n CORRUPT FILES:\t").append(corruptFiles); 944 if (missingSize > 0) { 945 res.append("\n MISSING BLOCKS:\t").append(missingIds.size()).append( 946 "\n MISSING SIZE:\t\t").append(missingSize).append(" B"); 947 } 948 if (corruptBlocks > 0) { 949 res.append("\n CORRUPT BLOCKS: \t").append(corruptBlocks); 950 } 951 } 952 res.append("\n ********************************"); 953 } 954 res.append("\n Minimally replicated blocks:\t").append( 955 numMinReplicatedBlocks); 956 if (totalBlocks > 0) { 957 res.append(" (").append( 958 ((float) (numMinReplicatedBlocks * 100) / (float) totalBlocks)) 959 .append(" %)"); 960 } 961 res.append("\n Over-replicated blocks:\t") 962 .append(numOverReplicatedBlocks); 963 if (totalBlocks > 0) { 964 res.append(" (").append( 965 ((float) (numOverReplicatedBlocks * 100) / (float) totalBlocks)) 966 .append(" %)"); 967 } 968 res.append("\n Under-replicated blocks:\t").append( 969 numUnderReplicatedBlocks); 970 if (totalBlocks > 0) { 971 res.append(" (").append( 972 ((float) (numUnderReplicatedBlocks * 100) / (float) totalBlocks)) 973 .append(" %)"); 974 } 975 res.append("\n Mis-replicated blocks:\t\t") 976 .append(numMisReplicatedBlocks); 977 if (totalBlocks > 0) { 978 res.append(" (").append( 979 ((float) (numMisReplicatedBlocks * 100) / (float) totalBlocks)) 980 .append(" %)"); 981 } 982 res.append("\n Default replication factor:\t").append(replication) 983 .append("\n Average block replication:\t").append( 984 getReplicationFactor()).append("\n Corrupt blocks:\t\t").append( 985 corruptBlocks).append("\n Missing replicas:\t\t").append( 986 missingReplicas); 987 if (totalReplicas > 0) { 988 res.append(" (").append( 989 ((float) (missingReplicas * 100) / (float) numExpectedReplicas)).append( 990 " %)"); 991 } 992 return res.toString(); 993 } 994 } 995 } 996