1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; 19 20 import java.io.BufferedOutputStream; 21 import java.io.DataOutputStream; 22 import java.io.EOFException; 23 import java.io.File; 24 import java.io.FileDescriptor; 25 import java.io.FileInputStream; 26 import java.io.FileNotFoundException; 27 import java.io.FileOutputStream; 28 import java.io.IOException; 29 import java.io.InputStream; 30 import java.io.RandomAccessFile; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.ClosedChannelException; 33 import java.nio.channels.FileChannel; 34 import java.util.ArrayList; 35 import java.util.Collection; 36 import java.util.HashMap; 37 import java.util.HashSet; 38 import java.util.Iterator; 39 import java.util.List; 40 import java.util.Map; 41 import java.util.Set; 42 import java.util.concurrent.ConcurrentHashMap; 43 import java.util.*; 44 import java.util.concurrent.Executor; 45 46 import javax.management.NotCompliantMBeanException; 47 import javax.management.ObjectName; 48 import javax.management.StandardMBean; 49 50 import com.google.common.annotations.VisibleForTesting; 51 import org.apache.commons.logging.Log; 52 import org.apache.commons.logging.LogFactory; 53 import org.apache.hadoop.classification.InterfaceAudience; 54 import org.apache.hadoop.conf.Configuration; 55 import org.apache.hadoop.fs.FileStatus; 56 import org.apache.hadoop.fs.FileSystem; 57 import org.apache.hadoop.fs.LocalFileSystem; 58 import org.apache.hadoop.fs.permission.FsPermission; 59 import org.apache.hadoop.fs.Path; 60 import org.apache.hadoop.fs.StorageType; 61 import org.apache.hadoop.hdfs.DFSConfigKeys; 62 import org.apache.hadoop.hdfs.ExtendedBlockId; 63 import org.apache.hadoop.hdfs.protocol.Block; 64 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; 65 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; 66 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 67 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; 68 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 69 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; 70 import org.apache.hadoop.hdfs.server.common.GenerationStamp; 71 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; 72 import org.apache.hadoop.hdfs.server.common.Storage; 73 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; 74 import org.apache.hadoop.hdfs.server.datanode.BlockScanner; 75 import org.apache.hadoop.hdfs.server.datanode.DataNode; 76 import org.apache.hadoop.hdfs.server.datanode.DataStorage; 77 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; 78 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; 79 import org.apache.hadoop.hdfs.server.datanode.Replica; 80 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; 81 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; 82 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; 83 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; 84 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; 85 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; 86 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; 87 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; 88 import org.apache.hadoop.hdfs.server.datanode.StorageLocation; 89 import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException; 90 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 91 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; 92 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; 93 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; 94 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; 95 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; 96 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; 97 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; 98 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; 99 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; 100 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; 101 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; 102 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; 103 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; 104 import org.apache.hadoop.hdfs.server.protocol.StorageReport; 105 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; 106 import org.apache.hadoop.io.IOUtils; 107 import org.apache.hadoop.io.MultipleIOException; 108 import org.apache.hadoop.io.nativeio.NativeIO; 109 import org.apache.hadoop.metrics2.util.MBeans; 110 import org.apache.hadoop.util.Daemon; 111 import org.apache.hadoop.util.DataChecksum; 112 import org.apache.hadoop.util.DiskChecker.DiskErrorException; 113 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; 114 import org.apache.hadoop.util.ReflectionUtils; 115 import org.apache.hadoop.util.Time; 116 117 import com.google.common.base.Preconditions; 118 import com.google.common.collect.Lists; 119 import com.google.common.collect.Sets; 120 121 /************************************************** 122 * FSDataset manages a set of data blocks. Each block 123 * has a unique name and an extent on disk. 124 * 125 ***************************************************/ 126 @InterfaceAudience.Private 127 class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { 128 static final Log LOG = LogFactory.getLog(FsDatasetImpl.class); 129 private final static boolean isNativeIOAvailable; 130 static { 131 isNativeIOAvailable = NativeIO.isAvailable(); 132 if (Path.WINDOWS && !isNativeIOAvailable) { 133 LOG.warn("Data node cannot fully support concurrent reading" 134 + " and writing without native code extensions on Windows."); 135 } 136 } 137 138 @Override // FsDatasetSpi getVolumes()139 public List<FsVolumeImpl> getVolumes() { 140 return volumes.getVolumes(); 141 } 142 143 @Override getStorage(final String storageUuid)144 public DatanodeStorage getStorage(final String storageUuid) { 145 return storageMap.get(storageUuid); 146 } 147 148 @Override // FsDatasetSpi getStorageReports(String bpid)149 public StorageReport[] getStorageReports(String bpid) 150 throws IOException { 151 List<StorageReport> reports; 152 synchronized (statsLock) { 153 List<FsVolumeImpl> curVolumes = getVolumes(); 154 reports = new ArrayList<>(curVolumes.size()); 155 for (FsVolumeImpl volume : curVolumes) { 156 try (FsVolumeReference ref = volume.obtainReference()) { 157 StorageReport sr = new StorageReport(volume.toDatanodeStorage(), 158 false, 159 volume.getCapacity(), 160 volume.getDfsUsed(), 161 volume.getAvailable(), 162 volume.getBlockPoolUsed(bpid)); 163 reports.add(sr); 164 } catch (ClosedChannelException e) { 165 continue; 166 } 167 } 168 } 169 170 return reports.toArray(new StorageReport[reports.size()]); 171 } 172 173 @Override getVolume(final ExtendedBlock b)174 public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) { 175 final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); 176 return r != null? (FsVolumeImpl)r.getVolume(): null; 177 } 178 179 @Override // FsDatasetSpi getStoredBlock(String bpid, long blkid)180 public synchronized Block getStoredBlock(String bpid, long blkid) 181 throws IOException { 182 File blockfile = getFile(bpid, blkid, false); 183 if (blockfile == null) { 184 return null; 185 } 186 final File metafile = FsDatasetUtil.findMetaFile(blockfile); 187 final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile); 188 return new Block(blkid, blockfile.length(), gs); 189 } 190 191 192 /** 193 * This should be primarily used for testing. 194 * @return clone of replica store in datanode memory 195 */ fetchReplicaInfo(String bpid, long blockId)196 ReplicaInfo fetchReplicaInfo(String bpid, long blockId) { 197 ReplicaInfo r = volumeMap.get(bpid, blockId); 198 if(r == null) 199 return null; 200 switch(r.getState()) { 201 case FINALIZED: 202 return new FinalizedReplica((FinalizedReplica)r); 203 case RBW: 204 return new ReplicaBeingWritten((ReplicaBeingWritten)r); 205 case RWR: 206 return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r); 207 case RUR: 208 return new ReplicaUnderRecovery((ReplicaUnderRecovery)r); 209 case TEMPORARY: 210 return new ReplicaInPipeline((ReplicaInPipeline)r); 211 } 212 return null; 213 } 214 215 @Override // FsDatasetSpi getMetaDataInputStream(ExtendedBlock b)216 public LengthInputStream getMetaDataInputStream(ExtendedBlock b) 217 throws IOException { 218 File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp()); 219 if (meta == null || !meta.exists()) { 220 return null; 221 } 222 if (isNativeIOAvailable) { 223 return new LengthInputStream( 224 NativeIO.getShareDeleteFileInputStream(meta), 225 meta.length()); 226 } 227 return new LengthInputStream(new FileInputStream(meta), meta.length()); 228 } 229 230 final DataNode datanode; 231 final DataStorage dataStorage; 232 final FsVolumeList volumes; 233 final Map<String, DatanodeStorage> storageMap; 234 final FsDatasetAsyncDiskService asyncDiskService; 235 final Daemon lazyWriter; 236 final FsDatasetCache cacheManager; 237 private final Configuration conf; 238 private final int validVolsRequired; 239 private volatile boolean fsRunning; 240 241 final ReplicaMap volumeMap; 242 final Map<String, Set<Long>> deletingBlock; 243 final RamDiskReplicaTracker ramDiskReplicaTracker; 244 final RamDiskAsyncLazyPersistService asyncLazyPersistService; 245 246 private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3; 247 248 249 // Used for synchronizing access to usage stats 250 private final Object statsLock = new Object(); 251 252 final LocalFileSystem localFS; 253 254 private boolean blockPinningEnabled; 255 256 /** 257 * An FSDataset has a directory where it loads its data files. 258 */ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf )259 FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf 260 ) throws IOException { 261 this.fsRunning = true; 262 this.datanode = datanode; 263 this.dataStorage = storage; 264 this.conf = conf; 265 // The number of volumes required for operation is the total number 266 // of volumes minus the number of failed volumes we can tolerate. 267 final int volFailuresTolerated = 268 conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 269 DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); 270 271 String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); 272 Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf); 273 List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos( 274 dataLocations, storage); 275 276 int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; 277 int volsFailed = volumeFailureInfos.size(); 278 this.validVolsRequired = volsConfigured - volFailuresTolerated; 279 280 if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { 281 throw new DiskErrorException("Invalid volume failure " 282 + " config value: " + volFailuresTolerated); 283 } 284 if (volsFailed > volFailuresTolerated) { 285 throw new DiskErrorException("Too many failed volumes - " 286 + "current valid volumes: " + storage.getNumStorageDirs() 287 + ", volumes configured: " + volsConfigured 288 + ", volumes failed: " + volsFailed 289 + ", volume failures tolerated: " + volFailuresTolerated); 290 } 291 292 storageMap = new ConcurrentHashMap<String, DatanodeStorage>(); 293 volumeMap = new ReplicaMap(this); 294 ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); 295 296 @SuppressWarnings("unchecked") 297 final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = 298 ReflectionUtils.newInstance(conf.getClass( 299 DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, 300 RoundRobinVolumeChoosingPolicy.class, 301 VolumeChoosingPolicy.class), conf); 302 volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), 303 blockChooserImpl); 304 asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); 305 asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); 306 deletingBlock = new HashMap<String, Set<Long>>(); 307 308 for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { 309 addVolume(dataLocations, storage.getStorageDir(idx)); 310 } 311 setupAsyncLazyPersistThreads(); 312 313 cacheManager = new FsDatasetCache(this); 314 315 // Start the lazy writer once we have built the replica maps. 316 lazyWriter = new Daemon(new LazyWriter(conf)); 317 lazyWriter.start(); 318 registerMBean(datanode.getDatanodeUuid()); 319 localFS = FileSystem.getLocal(conf); 320 blockPinningEnabled = conf.getBoolean( 321 DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, 322 DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT); 323 } 324 325 /** 326 * Gets initial volume failure information for all volumes that failed 327 * immediately at startup. The method works by determining the set difference 328 * between all configured storage locations and the actual storage locations in 329 * use after attempting to put all of them into service. 330 * 331 * @return each storage location that has failed 332 */ getInitialVolumeFailureInfos( Collection<StorageLocation> dataLocations, DataStorage storage)333 private static List<VolumeFailureInfo> getInitialVolumeFailureInfos( 334 Collection<StorageLocation> dataLocations, DataStorage storage) { 335 Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize( 336 dataLocations.size()); 337 for (StorageLocation sl: dataLocations) { 338 failedLocationSet.add(sl.getFile().getAbsolutePath()); 339 } 340 for (Iterator<Storage.StorageDirectory> it = storage.dirIterator(); 341 it.hasNext(); ) { 342 Storage.StorageDirectory sd = it.next(); 343 failedLocationSet.remove(sd.getRoot().getAbsolutePath()); 344 } 345 List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity( 346 failedLocationSet.size()); 347 long failureDate = Time.now(); 348 for (String failedStorageLocation: failedLocationSet) { 349 volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation, 350 failureDate)); 351 } 352 return volumeFailureInfos; 353 } 354 addVolume(Collection<StorageLocation> dataLocations, Storage.StorageDirectory sd)355 private void addVolume(Collection<StorageLocation> dataLocations, 356 Storage.StorageDirectory sd) throws IOException { 357 final File dir = sd.getCurrentDir(); 358 final StorageType storageType = 359 getStorageTypeFromLocations(dataLocations, sd.getRoot()); 360 361 // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is 362 // nothing needed to be rolled back to make various data structures, e.g., 363 // storageMap and asyncDiskService, consistent. 364 FsVolumeImpl fsVolume = new FsVolumeImpl( 365 this, sd.getStorageUuid(), dir, this.conf, storageType); 366 FsVolumeReference ref = fsVolume.obtainReference(); 367 ReplicaMap tempVolumeMap = new ReplicaMap(this); 368 fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); 369 370 synchronized (this) { 371 volumeMap.addAll(tempVolumeMap); 372 storageMap.put(sd.getStorageUuid(), 373 new DatanodeStorage(sd.getStorageUuid(), 374 DatanodeStorage.State.NORMAL, 375 storageType)); 376 asyncDiskService.addVolume(sd.getCurrentDir()); 377 volumes.addVolume(ref); 378 } 379 380 LOG.info("Added volume - " + dir + ", StorageType: " + storageType); 381 } 382 383 @VisibleForTesting createFsVolume(String storageUuid, File currentDir, StorageType storageType)384 public FsVolumeImpl createFsVolume(String storageUuid, File currentDir, 385 StorageType storageType) throws IOException { 386 return new FsVolumeImpl(this, storageUuid, currentDir, conf, storageType); 387 } 388 389 @Override addVolume(final StorageLocation location, final List<NamespaceInfo> nsInfos)390 public void addVolume(final StorageLocation location, 391 final List<NamespaceInfo> nsInfos) 392 throws IOException { 393 final File dir = location.getFile(); 394 395 // Prepare volume in DataStorage 396 final DataStorage.VolumeBuilder builder; 397 try { 398 builder = dataStorage.prepareVolume(datanode, location.getFile(), nsInfos); 399 } catch (IOException e) { 400 volumes.addVolumeFailureInfo(new VolumeFailureInfo( 401 location.getFile().getAbsolutePath(), Time.now())); 402 throw e; 403 } 404 405 final Storage.StorageDirectory sd = builder.getStorageDirectory(); 406 407 StorageType storageType = location.getStorageType(); 408 final FsVolumeImpl fsVolume = 409 createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType); 410 final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume); 411 ArrayList<IOException> exceptions = Lists.newArrayList(); 412 413 for (final NamespaceInfo nsInfo : nsInfos) { 414 String bpid = nsInfo.getBlockPoolID(); 415 try { 416 fsVolume.addBlockPool(bpid, this.conf); 417 fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker); 418 } catch (IOException e) { 419 LOG.warn("Caught exception when adding " + fsVolume + 420 ". Will throw later.", e); 421 exceptions.add(e); 422 } 423 } 424 if (!exceptions.isEmpty()) { 425 try { 426 sd.unlock(); 427 } catch (IOException e) { 428 exceptions.add(e); 429 } 430 throw MultipleIOException.createIOException(exceptions); 431 } 432 433 final FsVolumeReference ref = fsVolume.obtainReference(); 434 setupAsyncLazyPersistThread(fsVolume); 435 436 builder.build(); 437 synchronized (this) { 438 volumeMap.addAll(tempVolumeMap); 439 storageMap.put(sd.getStorageUuid(), 440 new DatanodeStorage(sd.getStorageUuid(), 441 DatanodeStorage.State.NORMAL, 442 storageType)); 443 asyncDiskService.addVolume(sd.getCurrentDir()); 444 volumes.addVolume(ref); 445 } 446 LOG.info("Added volume - " + dir + ", StorageType: " + storageType); 447 } 448 449 /** 450 * Removes a set of volumes from FsDataset. 451 * @param volumesToRemove a set of absolute root path of each volume. 452 * @param clearFailure set true to clear failure information. 453 */ 454 @Override removeVolumes(Set<File> volumesToRemove, boolean clearFailure)455 public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) { 456 // Make sure that all volumes are absolute path. 457 for (File vol : volumesToRemove) { 458 Preconditions.checkArgument(vol.isAbsolute(), 459 String.format("%s is not absolute path.", vol.getPath())); 460 } 461 462 Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>(); 463 List<String> storageToRemove = new ArrayList<>(); 464 synchronized (this) { 465 for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { 466 Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); 467 final File absRoot = sd.getRoot().getAbsoluteFile(); 468 if (volumesToRemove.contains(absRoot)) { 469 LOG.info("Removing " + absRoot + " from FsDataset."); 470 471 // Disable the volume from the service. 472 asyncDiskService.removeVolume(sd.getCurrentDir()); 473 volumes.removeVolume(absRoot, clearFailure); 474 475 // Removed all replica information for the blocks on the volume. 476 // Unlike updating the volumeMap in addVolume(), this operation does 477 // not scan disks. 478 for (String bpid : volumeMap.getBlockPoolList()) { 479 List<ReplicaInfo> blocks = new ArrayList<>(); 480 for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator(); 481 it.hasNext(); ) { 482 ReplicaInfo block = it.next(); 483 final File absBasePath = 484 new File(block.getVolume().getBasePath()).getAbsoluteFile(); 485 if (absBasePath.equals(absRoot)) { 486 blocks.add(block); 487 it.remove(); 488 } 489 } 490 blkToInvalidate.put(bpid, blocks); 491 } 492 493 storageToRemove.add(sd.getStorageUuid()); 494 } 495 } 496 setupAsyncLazyPersistThreads(); 497 } 498 499 // Call this outside the lock. 500 for (Map.Entry<String, List<ReplicaInfo>> entry : 501 blkToInvalidate.entrySet()) { 502 String bpid = entry.getKey(); 503 List<ReplicaInfo> blocks = entry.getValue(); 504 for (ReplicaInfo block : blocks) { 505 invalidate(bpid, block); 506 } 507 } 508 509 synchronized (this) { 510 for(String storageUuid : storageToRemove) { 511 storageMap.remove(storageUuid); 512 } 513 } 514 } 515 getStorageTypeFromLocations( Collection<StorageLocation> dataLocations, File dir)516 private StorageType getStorageTypeFromLocations( 517 Collection<StorageLocation> dataLocations, File dir) { 518 for (StorageLocation dataLocation : dataLocations) { 519 if (dataLocation.getFile().equals(dir)) { 520 return dataLocation.getStorageType(); 521 } 522 } 523 return StorageType.DEFAULT; 524 } 525 526 /** 527 * Return the total space used by dfs datanode 528 */ 529 @Override // FSDatasetMBean getDfsUsed()530 public long getDfsUsed() throws IOException { 531 synchronized(statsLock) { 532 return volumes.getDfsUsed(); 533 } 534 } 535 536 /** 537 * Return the total space used by dfs datanode 538 */ 539 @Override // FSDatasetMBean getBlockPoolUsed(String bpid)540 public long getBlockPoolUsed(String bpid) throws IOException { 541 synchronized(statsLock) { 542 return volumes.getBlockPoolUsed(bpid); 543 } 544 } 545 546 /** 547 * Return true - if there are still valid volumes on the DataNode. 548 */ 549 @Override // FsDatasetSpi hasEnoughResource()550 public boolean hasEnoughResource() { 551 return getVolumes().size() >= validVolsRequired; 552 } 553 554 /** 555 * Return total capacity, used and unused 556 */ 557 @Override // FSDatasetMBean getCapacity()558 public long getCapacity() { 559 synchronized(statsLock) { 560 return volumes.getCapacity(); 561 } 562 } 563 564 /** 565 * Return how many bytes can still be stored in the FSDataset 566 */ 567 @Override // FSDatasetMBean getRemaining()568 public long getRemaining() throws IOException { 569 synchronized(statsLock) { 570 return volumes.getRemaining(); 571 } 572 } 573 574 /** 575 * Return the number of failed volumes in the FSDataset. 576 */ 577 @Override // FSDatasetMBean getNumFailedVolumes()578 public int getNumFailedVolumes() { 579 return volumes.getVolumeFailureInfos().length; 580 } 581 582 @Override // FSDatasetMBean getFailedStorageLocations()583 public String[] getFailedStorageLocations() { 584 VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos(); 585 List<String> failedStorageLocations = Lists.newArrayListWithCapacity( 586 infos.length); 587 for (VolumeFailureInfo info: infos) { 588 failedStorageLocations.add(info.getFailedStorageLocation()); 589 } 590 return failedStorageLocations.toArray( 591 new String[failedStorageLocations.size()]); 592 } 593 594 @Override // FSDatasetMBean getLastVolumeFailureDate()595 public long getLastVolumeFailureDate() { 596 long lastVolumeFailureDate = 0; 597 for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) { 598 long failureDate = info.getFailureDate(); 599 if (failureDate > lastVolumeFailureDate) { 600 lastVolumeFailureDate = failureDate; 601 } 602 } 603 return lastVolumeFailureDate; 604 } 605 606 @Override // FSDatasetMBean getEstimatedCapacityLostTotal()607 public long getEstimatedCapacityLostTotal() { 608 long estimatedCapacityLostTotal = 0; 609 for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) { 610 estimatedCapacityLostTotal += info.getEstimatedCapacityLost(); 611 } 612 return estimatedCapacityLostTotal; 613 } 614 615 @Override // FsDatasetSpi getVolumeFailureSummary()616 public VolumeFailureSummary getVolumeFailureSummary() { 617 VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos(); 618 if (infos.length == 0) { 619 return null; 620 } 621 List<String> failedStorageLocations = Lists.newArrayListWithCapacity( 622 infos.length); 623 long lastVolumeFailureDate = 0; 624 long estimatedCapacityLostTotal = 0; 625 for (VolumeFailureInfo info: infos) { 626 failedStorageLocations.add(info.getFailedStorageLocation()); 627 long failureDate = info.getFailureDate(); 628 if (failureDate > lastVolumeFailureDate) { 629 lastVolumeFailureDate = failureDate; 630 } 631 estimatedCapacityLostTotal += info.getEstimatedCapacityLost(); 632 } 633 return new VolumeFailureSummary( 634 failedStorageLocations.toArray(new String[failedStorageLocations.size()]), 635 lastVolumeFailureDate, estimatedCapacityLostTotal); 636 } 637 638 @Override // FSDatasetMBean getCacheUsed()639 public long getCacheUsed() { 640 return cacheManager.getCacheUsed(); 641 } 642 643 @Override // FSDatasetMBean getCacheCapacity()644 public long getCacheCapacity() { 645 return cacheManager.getCacheCapacity(); 646 } 647 648 @Override // FSDatasetMBean getNumBlocksFailedToCache()649 public long getNumBlocksFailedToCache() { 650 return cacheManager.getNumBlocksFailedToCache(); 651 } 652 653 @Override // FSDatasetMBean getNumBlocksFailedToUncache()654 public long getNumBlocksFailedToUncache() { 655 return cacheManager.getNumBlocksFailedToUncache(); 656 } 657 658 @Override // FSDatasetMBean getNumBlocksCached()659 public long getNumBlocksCached() { 660 return cacheManager.getNumBlocksCached(); 661 } 662 663 /** 664 * Find the block's on-disk length 665 */ 666 @Override // FsDatasetSpi getLength(ExtendedBlock b)667 public long getLength(ExtendedBlock b) throws IOException { 668 return getBlockFile(b).length(); 669 } 670 671 /** 672 * Get File name for a given block. 673 */ getBlockFile(ExtendedBlock b)674 private File getBlockFile(ExtendedBlock b) throws IOException { 675 return getBlockFile(b.getBlockPoolId(), b.getBlockId()); 676 } 677 678 /** 679 * Get File name for a given block. 680 */ getBlockFile(String bpid, long blockId)681 File getBlockFile(String bpid, long blockId) throws IOException { 682 File f = validateBlockFile(bpid, blockId); 683 if(f == null) { 684 throw new IOException("BlockId " + blockId + " is not valid."); 685 } 686 return f; 687 } 688 689 /** 690 * Return the File associated with a block, without first 691 * checking that it exists. This should be used when the 692 * next operation is going to open the file for read anyway, 693 * and thus the exists check is redundant. 694 * 695 * @param touch if true then update the last access timestamp of the 696 * block. Currently used for blocks on transient storage. 697 */ getBlockFileNoExistsCheck(ExtendedBlock b, boolean touch)698 private File getBlockFileNoExistsCheck(ExtendedBlock b, 699 boolean touch) 700 throws IOException { 701 final File f; 702 synchronized(this) { 703 f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch); 704 } 705 if (f == null) { 706 throw new IOException("Block " + b + " is not valid"); 707 } 708 return f; 709 } 710 711 @Override // FsDatasetSpi getBlockInputStream(ExtendedBlock b, long seekOffset)712 public InputStream getBlockInputStream(ExtendedBlock b, 713 long seekOffset) throws IOException { 714 File blockFile = getBlockFileNoExistsCheck(b, true); 715 if (isNativeIOAvailable) { 716 return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset); 717 } else { 718 try { 719 return openAndSeek(blockFile, seekOffset); 720 } catch (FileNotFoundException fnfe) { 721 throw new IOException("Block " + b + " is not valid. " + 722 "Expected block file at " + blockFile + " does not exist."); 723 } 724 } 725 } 726 727 /** 728 * Get the meta info of a block stored in volumeMap. To find a block, 729 * block pool Id, block Id and generation stamp must match. 730 * @param b extended block 731 * @return the meta replica information 732 * @throws ReplicaNotFoundException if no entry is in the map or 733 * there is a generation stamp mismatch 734 */ getReplicaInfo(ExtendedBlock b)735 ReplicaInfo getReplicaInfo(ExtendedBlock b) 736 throws ReplicaNotFoundException { 737 ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); 738 if (info == null) { 739 throw new ReplicaNotFoundException( 740 ReplicaNotFoundException.NON_EXISTENT_REPLICA + b); 741 } 742 return info; 743 } 744 745 /** 746 * Get the meta info of a block stored in volumeMap. Block is looked up 747 * without matching the generation stamp. 748 * @param bpid block pool Id 749 * @param blkid block Id 750 * @return the meta replica information; null if block was not found 751 * @throws ReplicaNotFoundException if no entry is in the map or 752 * there is a generation stamp mismatch 753 */ getReplicaInfo(String bpid, long blkid)754 private ReplicaInfo getReplicaInfo(String bpid, long blkid) 755 throws ReplicaNotFoundException { 756 ReplicaInfo info = volumeMap.get(bpid, blkid); 757 if (info == null) { 758 throw new ReplicaNotFoundException( 759 ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid); 760 } 761 return info; 762 } 763 764 /** 765 * Returns handles to the block file and its metadata file 766 */ 767 @Override // FsDatasetSpi getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset)768 public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, 769 long blkOffset, long metaOffset) throws IOException { 770 ReplicaInfo info = getReplicaInfo(b); 771 FsVolumeReference ref = info.getVolume().obtainReference(); 772 try { 773 InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset); 774 try { 775 InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset); 776 return new ReplicaInputStreams(blockInStream, metaInStream, ref); 777 } catch (IOException e) { 778 IOUtils.cleanup(null, blockInStream); 779 throw e; 780 } 781 } catch (IOException e) { 782 IOUtils.cleanup(null, ref); 783 throw e; 784 } 785 } 786 openAndSeek(File file, long offset)787 private static FileInputStream openAndSeek(File file, long offset) 788 throws IOException { 789 RandomAccessFile raf = null; 790 try { 791 raf = new RandomAccessFile(file, "r"); 792 if (offset > 0) { 793 raf.seek(offset); 794 } 795 return new FileInputStream(raf.getFD()); 796 } catch(IOException ioe) { 797 IOUtils.cleanup(null, raf); 798 throw ioe; 799 } 800 } 801 moveBlockFiles(Block b, File srcfile, File destdir)802 static File moveBlockFiles(Block b, File srcfile, File destdir) 803 throws IOException { 804 final File dstfile = new File(destdir, b.getBlockName()); 805 final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp()); 806 final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp()); 807 try { 808 NativeIO.renameTo(srcmeta, dstmeta); 809 } catch (IOException e) { 810 throw new IOException("Failed to move meta file for " + b 811 + " from " + srcmeta + " to " + dstmeta, e); 812 } 813 try { 814 NativeIO.renameTo(srcfile, dstfile); 815 } catch (IOException e) { 816 throw new IOException("Failed to move block file for " + b 817 + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e); 818 } 819 if (LOG.isDebugEnabled()) { 820 LOG.debug("addFinalizedBlock: Moved " + srcmeta + " to " + dstmeta 821 + " and " + srcfile + " to " + dstfile); 822 } 823 return dstfile; 824 } 825 826 /** 827 * Copy the block and meta files for the given block to the given destination. 828 * @return the new meta and block files. 829 * @throws IOException 830 */ copyBlockFiles(long blockId, long genStamp, File srcMeta, File srcFile, File destRoot, boolean calculateChecksum)831 static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta, 832 File srcFile, File destRoot, boolean calculateChecksum) 833 throws IOException { 834 final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId); 835 final File dstFile = new File(destDir, srcFile.getName()); 836 final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp); 837 return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum); 838 } 839 copyBlockFiles(File srcMeta, File srcFile, File dstMeta, File dstFile, boolean calculateChecksum)840 static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta, 841 File dstFile, boolean calculateChecksum) 842 throws IOException { 843 if (calculateChecksum) { 844 computeChecksum(srcMeta, dstMeta, srcFile); 845 } else { 846 try { 847 Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true); 848 } catch (IOException e) { 849 throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e); 850 } 851 } 852 853 try { 854 Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true); 855 } catch (IOException e) { 856 throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e); 857 } 858 if (LOG.isDebugEnabled()) { 859 if (calculateChecksum) { 860 LOG.debug("Copied " + srcMeta + " to " + dstMeta 861 + " and calculated checksum"); 862 } else { 863 LOG.debug("Copied " + srcFile + " to " + dstFile); 864 } 865 } 866 return new File[] {dstMeta, dstFile}; 867 } 868 869 /** 870 * Move block files from one storage to another storage. 871 * @return Returns the Old replicaInfo 872 * @throws IOException 873 */ 874 @Override moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType)875 public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, 876 StorageType targetStorageType) throws IOException { 877 ReplicaInfo replicaInfo = getReplicaInfo(block); 878 if (replicaInfo.getState() != ReplicaState.FINALIZED) { 879 throw new ReplicaNotFoundException( 880 ReplicaNotFoundException.UNFINALIZED_REPLICA + block); 881 } 882 if (replicaInfo.getNumBytes() != block.getNumBytes()) { 883 throw new IOException("Corrupted replica " + replicaInfo 884 + " with a length of " + replicaInfo.getNumBytes() 885 + " expected length is " + block.getNumBytes()); 886 } 887 if (replicaInfo.getVolume().getStorageType() == targetStorageType) { 888 throw new ReplicaAlreadyExistsException("Replica " + replicaInfo 889 + " already exists on storage " + targetStorageType); 890 } 891 892 if (replicaInfo.isOnTransientStorage()) { 893 // Block movement from RAM_DISK will be done by LazyPersist mechanism 894 throw new IOException("Replica " + replicaInfo 895 + " cannot be moved from storageType : " 896 + replicaInfo.getVolume().getStorageType()); 897 } 898 899 try (FsVolumeReference volumeRef = volumes.getNextVolume( 900 targetStorageType, block.getNumBytes())) { 901 File oldBlockFile = replicaInfo.getBlockFile(); 902 File oldMetaFile = replicaInfo.getMetaFile(); 903 FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume(); 904 // Copy files to temp dir first 905 File[] blockFiles = copyBlockFiles(block.getBlockId(), 906 block.getGenerationStamp(), oldMetaFile, oldBlockFile, 907 targetVolume.getTmpDir(block.getBlockPoolId()), 908 replicaInfo.isOnTransientStorage()); 909 910 ReplicaInfo newReplicaInfo = new ReplicaInPipeline( 911 replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(), 912 targetVolume, blockFiles[0].getParentFile(), 0); 913 newReplicaInfo.setNumBytes(blockFiles[1].length()); 914 // Finalize the copied files 915 newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); 916 917 removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile, 918 oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId()); 919 } 920 921 // Replace the old block if any to reschedule the scanning. 922 return replicaInfo; 923 } 924 925 /** 926 * Compute and store the checksum for a block file that does not already have 927 * its checksum computed. 928 * 929 * @param srcMeta source meta file, containing only the checksum header, not a 930 * calculated checksum 931 * @param dstMeta destination meta file, into which this method will write a 932 * full computed checksum 933 * @param blockFile block file for which the checksum will be computed 934 * @throws IOException 935 */ computeChecksum(File srcMeta, File dstMeta, File blockFile)936 private static void computeChecksum(File srcMeta, File dstMeta, File blockFile) 937 throws IOException { 938 final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta); 939 final byte[] data = new byte[1 << 16]; 940 final byte[] crcs = new byte[checksum.getChecksumSize(data.length)]; 941 942 DataOutputStream metaOut = null; 943 try { 944 File parentFile = dstMeta.getParentFile(); 945 if (parentFile != null) { 946 if (!parentFile.mkdirs() && !parentFile.isDirectory()) { 947 throw new IOException("Destination '" + parentFile 948 + "' directory cannot be created"); 949 } 950 } 951 metaOut = new DataOutputStream(new BufferedOutputStream( 952 new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE)); 953 BlockMetadataHeader.writeHeader(metaOut, checksum); 954 955 int offset = 0; 956 try (InputStream dataIn = isNativeIOAvailable ? 957 NativeIO.getShareDeleteFileInputStream(blockFile) : 958 new FileInputStream(blockFile)) { 959 960 for (int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) { 961 if (n > 0) { 962 n += offset; 963 offset = n % checksum.getBytesPerChecksum(); 964 final int length = n - offset; 965 966 if (length > 0) { 967 checksum.calculateChunkedSums(data, 0, length, crcs, 0); 968 metaOut.write(crcs, 0, checksum.getChecksumSize(length)); 969 970 System.arraycopy(data, length, data, 0, offset); 971 } 972 } 973 } 974 } 975 976 // calculate and write the last crc 977 checksum.calculateChunkedSums(data, 0, offset, crcs, 0); 978 metaOut.write(crcs, 0, 4); 979 } finally { 980 IOUtils.cleanup(LOG, metaOut); 981 } 982 } 983 truncateBlock(File blockFile, File metaFile, long oldlen, long newlen)984 static private void truncateBlock(File blockFile, File metaFile, 985 long oldlen, long newlen) throws IOException { 986 LOG.info("truncateBlock: blockFile=" + blockFile 987 + ", metaFile=" + metaFile 988 + ", oldlen=" + oldlen 989 + ", newlen=" + newlen); 990 991 if (newlen == oldlen) { 992 return; 993 } 994 if (newlen > oldlen) { 995 throw new IOException("Cannot truncate block to from oldlen (=" + oldlen 996 + ") to newlen (=" + newlen + ")"); 997 } 998 999 DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); 1000 int checksumsize = dcs.getChecksumSize(); 1001 int bpc = dcs.getBytesPerChecksum(); 1002 long n = (newlen - 1)/bpc + 1; 1003 long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize; 1004 long lastchunkoffset = (n - 1)*bpc; 1005 int lastchunksize = (int)(newlen - lastchunkoffset); 1006 byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; 1007 1008 RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); 1009 try { 1010 //truncate blockFile 1011 blockRAF.setLength(newlen); 1012 1013 //read last chunk 1014 blockRAF.seek(lastchunkoffset); 1015 blockRAF.readFully(b, 0, lastchunksize); 1016 } finally { 1017 blockRAF.close(); 1018 } 1019 1020 //compute checksum 1021 dcs.update(b, 0, lastchunksize); 1022 dcs.writeValue(b, 0, false); 1023 1024 //update metaFile 1025 RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); 1026 try { 1027 metaRAF.setLength(newmetalen); 1028 metaRAF.seek(newmetalen - checksumsize); 1029 metaRAF.write(b, 0, checksumsize); 1030 } finally { 1031 metaRAF.close(); 1032 } 1033 } 1034 1035 1036 @Override // FsDatasetSpi append(ExtendedBlock b, long newGS, long expectedBlockLen)1037 public synchronized ReplicaHandler append(ExtendedBlock b, 1038 long newGS, long expectedBlockLen) throws IOException { 1039 // If the block was successfully finalized because all packets 1040 // were successfully processed at the Datanode but the ack for 1041 // some of the packets were not received by the client. The client 1042 // re-opens the connection and retries sending those packets. 1043 // The other reason is that an "append" is occurring to this block. 1044 1045 // check the validity of the parameter 1046 if (newGS < b.getGenerationStamp()) { 1047 throw new IOException("The new generation stamp " + newGS + 1048 " should be greater than the replica " + b + "'s generation stamp"); 1049 } 1050 ReplicaInfo replicaInfo = getReplicaInfo(b); 1051 LOG.info("Appending to " + replicaInfo); 1052 if (replicaInfo.getState() != ReplicaState.FINALIZED) { 1053 throw new ReplicaNotFoundException( 1054 ReplicaNotFoundException.UNFINALIZED_REPLICA + b); 1055 } 1056 if (replicaInfo.getNumBytes() != expectedBlockLen) { 1057 throw new IOException("Corrupted replica " + replicaInfo + 1058 " with a length of " + replicaInfo.getNumBytes() + 1059 " expected length is " + expectedBlockLen); 1060 } 1061 1062 FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); 1063 ReplicaBeingWritten replica = null; 1064 try { 1065 replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, 1066 b.getNumBytes()); 1067 } catch (IOException e) { 1068 IOUtils.cleanup(null, ref); 1069 throw e; 1070 } 1071 return new ReplicaHandler(replica, ref); 1072 } 1073 1074 /** Append to a finalized replica 1075 * Change a finalized replica to be a RBW replica and 1076 * bump its generation stamp to be the newGS 1077 * 1078 * @param bpid block pool Id 1079 * @param replicaInfo a finalized replica 1080 * @param newGS new generation stamp 1081 * @param estimateBlockLen estimate generation stamp 1082 * @return a RBW replica 1083 * @throws IOException if moving the replica from finalized directory 1084 * to rbw directory fails 1085 */ append(String bpid, FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)1086 private synchronized ReplicaBeingWritten append(String bpid, 1087 FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) 1088 throws IOException { 1089 // If the block is cached, start uncaching it. 1090 cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); 1091 // unlink the finalized replica 1092 replicaInfo.unlinkBlock(1); 1093 1094 // construct a RBW replica with the new GS 1095 File blkfile = replicaInfo.getBlockFile(); 1096 FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); 1097 if (v.getAvailable() < estimateBlockLen - replicaInfo.getNumBytes()) { 1098 throw new DiskOutOfSpaceException("Insufficient space for appending to " 1099 + replicaInfo); 1100 } 1101 File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName()); 1102 File oldmeta = replicaInfo.getMetaFile(); 1103 ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( 1104 replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, 1105 v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen); 1106 File newmeta = newReplicaInfo.getMetaFile(); 1107 1108 // rename meta file to rbw directory 1109 if (LOG.isDebugEnabled()) { 1110 LOG.debug("Renaming " + oldmeta + " to " + newmeta); 1111 } 1112 try { 1113 NativeIO.renameTo(oldmeta, newmeta); 1114 } catch (IOException e) { 1115 throw new IOException("Block " + replicaInfo + " reopen failed. " + 1116 " Unable to move meta file " + oldmeta + 1117 " to rbw dir " + newmeta, e); 1118 } 1119 1120 // rename block file to rbw directory 1121 if (LOG.isDebugEnabled()) { 1122 LOG.debug("Renaming " + blkfile + " to " + newBlkFile 1123 + ", file length=" + blkfile.length()); 1124 } 1125 try { 1126 NativeIO.renameTo(blkfile, newBlkFile); 1127 } catch (IOException e) { 1128 try { 1129 NativeIO.renameTo(newmeta, oldmeta); 1130 } catch (IOException ex) { 1131 LOG.warn("Cannot move meta file " + newmeta + 1132 "back to the finalized directory " + oldmeta, ex); 1133 } 1134 throw new IOException("Block " + replicaInfo + " reopen failed. " + 1135 " Unable to move block file " + blkfile + 1136 " to rbw dir " + newBlkFile, e); 1137 } 1138 1139 // Replace finalized replica by a RBW replica in replicas map 1140 volumeMap.add(bpid, newReplicaInfo); 1141 v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes()); 1142 return newReplicaInfo; 1143 } 1144 recoverCheck(ExtendedBlock b, long newGS, long expectedBlockLen)1145 private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, 1146 long expectedBlockLen) throws IOException { 1147 ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); 1148 1149 // check state 1150 if (replicaInfo.getState() != ReplicaState.FINALIZED && 1151 replicaInfo.getState() != ReplicaState.RBW) { 1152 throw new ReplicaNotFoundException( 1153 ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo); 1154 } 1155 1156 // check generation stamp 1157 long replicaGenerationStamp = replicaInfo.getGenerationStamp(); 1158 if (replicaGenerationStamp < b.getGenerationStamp() || 1159 replicaGenerationStamp > newGS) { 1160 throw new ReplicaNotFoundException( 1161 ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp 1162 + ". Expected GS range is [" + b.getGenerationStamp() + ", " + 1163 newGS + "]."); 1164 } 1165 1166 // stop the previous writer before check a replica's length 1167 long replicaLen = replicaInfo.getNumBytes(); 1168 if (replicaInfo.getState() == ReplicaState.RBW) { 1169 ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; 1170 // kill the previous writer 1171 rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); 1172 rbw.setWriter(Thread.currentThread()); 1173 // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same 1174 if (replicaLen != rbw.getBytesOnDisk() 1175 || replicaLen != rbw.getBytesAcked()) { 1176 throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo + 1177 "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" + 1178 rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() + 1179 ") are not the same."); 1180 } 1181 } 1182 1183 // check block length 1184 if (replicaLen != expectedBlockLen) { 1185 throw new IOException("Corrupted replica " + replicaInfo + 1186 " with a length of " + replicaLen + 1187 " expected length is " + expectedBlockLen); 1188 } 1189 1190 return replicaInfo; 1191 } 1192 1193 @Override // FsDatasetSpi recoverAppend( ExtendedBlock b, long newGS, long expectedBlockLen)1194 public synchronized ReplicaHandler recoverAppend( 1195 ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { 1196 LOG.info("Recover failed append to " + b); 1197 1198 ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); 1199 1200 FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); 1201 ReplicaBeingWritten replica; 1202 try { 1203 // change the replica's state/gs etc. 1204 if (replicaInfo.getState() == ReplicaState.FINALIZED) { 1205 replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, 1206 newGS, b.getNumBytes()); 1207 } else { //RBW 1208 bumpReplicaGS(replicaInfo, newGS); 1209 replica = (ReplicaBeingWritten) replicaInfo; 1210 } 1211 } catch (IOException e) { 1212 IOUtils.cleanup(null, ref); 1213 throw e; 1214 } 1215 return new ReplicaHandler(replica, ref); 1216 } 1217 1218 @Override // FsDatasetSpi recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)1219 public synchronized String recoverClose(ExtendedBlock b, long newGS, 1220 long expectedBlockLen) throws IOException { 1221 LOG.info("Recover failed close " + b); 1222 // check replica's state 1223 ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); 1224 // bump the replica's GS 1225 bumpReplicaGS(replicaInfo, newGS); 1226 // finalize the replica if RBW 1227 if (replicaInfo.getState() == ReplicaState.RBW) { 1228 finalizeReplica(b.getBlockPoolId(), replicaInfo); 1229 } 1230 return replicaInfo.getStorageUuid(); 1231 } 1232 1233 /** 1234 * Bump a replica's generation stamp to a new one. 1235 * Its on-disk meta file name is renamed to be the new one too. 1236 * 1237 * @param replicaInfo a replica 1238 * @param newGS new generation stamp 1239 * @throws IOException if rename fails 1240 */ bumpReplicaGS(ReplicaInfo replicaInfo, long newGS)1241 private void bumpReplicaGS(ReplicaInfo replicaInfo, 1242 long newGS) throws IOException { 1243 long oldGS = replicaInfo.getGenerationStamp(); 1244 File oldmeta = replicaInfo.getMetaFile(); 1245 replicaInfo.setGenerationStamp(newGS); 1246 File newmeta = replicaInfo.getMetaFile(); 1247 1248 // rename meta file to new GS 1249 if (LOG.isDebugEnabled()) { 1250 LOG.debug("Renaming " + oldmeta + " to " + newmeta); 1251 } 1252 try { 1253 NativeIO.renameTo(oldmeta, newmeta); 1254 } catch (IOException e) { 1255 replicaInfo.setGenerationStamp(oldGS); // restore old GS 1256 throw new IOException("Block " + replicaInfo + " reopen failed. " + 1257 " Unable to move meta file " + oldmeta + 1258 " to " + newmeta, e); 1259 } 1260 } 1261 1262 @Override // FsDatasetSpi createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)1263 public synchronized ReplicaHandler createRbw( 1264 StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) 1265 throws IOException { 1266 ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 1267 b.getBlockId()); 1268 if (replicaInfo != null) { 1269 throw new ReplicaAlreadyExistsException("Block " + b + 1270 " already exists in state " + replicaInfo.getState() + 1271 " and thus cannot be created."); 1272 } 1273 // create a new block 1274 FsVolumeReference ref; 1275 while (true) { 1276 try { 1277 if (allowLazyPersist) { 1278 // First try to place the block on a transient volume. 1279 ref = volumes.getNextTransientVolume(b.getNumBytes()); 1280 datanode.getMetrics().incrRamDiskBlocksWrite(); 1281 } else { 1282 ref = volumes.getNextVolume(storageType, b.getNumBytes()); 1283 } 1284 } catch (DiskOutOfSpaceException de) { 1285 if (allowLazyPersist) { 1286 datanode.getMetrics().incrRamDiskBlocksWriteFallback(); 1287 allowLazyPersist = false; 1288 continue; 1289 } 1290 throw de; 1291 } 1292 break; 1293 } 1294 FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); 1295 // create an rbw file to hold block in the designated volume 1296 File f; 1297 try { 1298 f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); 1299 } catch (IOException e) { 1300 IOUtils.cleanup(null, ref); 1301 throw e; 1302 } 1303 1304 ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 1305 b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); 1306 volumeMap.add(b.getBlockPoolId(), newReplicaInfo); 1307 return new ReplicaHandler(newReplicaInfo, ref); 1308 } 1309 1310 @Override // FsDatasetSpi recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)1311 public synchronized ReplicaHandler recoverRbw( 1312 ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) 1313 throws IOException { 1314 LOG.info("Recover RBW replica " + b); 1315 1316 ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); 1317 1318 // check the replica's state 1319 if (replicaInfo.getState() != ReplicaState.RBW) { 1320 throw new ReplicaNotFoundException( 1321 ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo); 1322 } 1323 ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; 1324 1325 LOG.info("Recovering " + rbw); 1326 1327 // Stop the previous writer 1328 rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); 1329 rbw.setWriter(Thread.currentThread()); 1330 1331 // check generation stamp 1332 long replicaGenerationStamp = rbw.getGenerationStamp(); 1333 if (replicaGenerationStamp < b.getGenerationStamp() || 1334 replicaGenerationStamp > newGS) { 1335 throw new ReplicaNotFoundException( 1336 ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b + 1337 ". Expected GS range is [" + b.getGenerationStamp() + ", " + 1338 newGS + "]."); 1339 } 1340 1341 // check replica length 1342 long bytesAcked = rbw.getBytesAcked(); 1343 long numBytes = rbw.getNumBytes(); 1344 if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){ 1345 throw new ReplicaNotFoundException("Unmatched length replica " + 1346 replicaInfo + ": BytesAcked = " + bytesAcked + 1347 " BytesRcvd = " + numBytes + " are not in the range of [" + 1348 minBytesRcvd + ", " + maxBytesRcvd + "]."); 1349 } 1350 1351 FsVolumeReference ref = rbw.getVolume().obtainReference(); 1352 try { 1353 // Truncate the potentially corrupt portion. 1354 // If the source was client and the last node in the pipeline was lost, 1355 // any corrupt data written after the acked length can go unnoticed. 1356 if (numBytes > bytesAcked) { 1357 final File replicafile = rbw.getBlockFile(); 1358 truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); 1359 rbw.setNumBytes(bytesAcked); 1360 rbw.setLastChecksumAndDataLen(bytesAcked, null); 1361 } 1362 1363 // bump the replica's generation stamp to newGS 1364 bumpReplicaGS(rbw, newGS); 1365 } catch (IOException e) { 1366 IOUtils.cleanup(null, ref); 1367 throw e; 1368 } 1369 return new ReplicaHandler(rbw, ref); 1370 } 1371 1372 @Override // FsDatasetSpi convertTemporaryToRbw( final ExtendedBlock b)1373 public synchronized ReplicaInPipeline convertTemporaryToRbw( 1374 final ExtendedBlock b) throws IOException { 1375 final long blockId = b.getBlockId(); 1376 final long expectedGs = b.getGenerationStamp(); 1377 final long visible = b.getNumBytes(); 1378 LOG.info("Convert " + b + " from Temporary to RBW, visible length=" 1379 + visible); 1380 1381 final ReplicaInPipeline temp; 1382 { 1383 // get replica 1384 final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId); 1385 if (r == null) { 1386 throw new ReplicaNotFoundException( 1387 ReplicaNotFoundException.NON_EXISTENT_REPLICA + b); 1388 } 1389 // check the replica's state 1390 if (r.getState() != ReplicaState.TEMPORARY) { 1391 throw new ReplicaAlreadyExistsException( 1392 "r.getState() != ReplicaState.TEMPORARY, r=" + r); 1393 } 1394 temp = (ReplicaInPipeline)r; 1395 } 1396 // check generation stamp 1397 if (temp.getGenerationStamp() != expectedGs) { 1398 throw new ReplicaAlreadyExistsException( 1399 "temp.getGenerationStamp() != expectedGs = " + expectedGs 1400 + ", temp=" + temp); 1401 } 1402 1403 // TODO: check writer? 1404 // set writer to the current thread 1405 // temp.setWriter(Thread.currentThread()); 1406 1407 // check length 1408 final long numBytes = temp.getNumBytes(); 1409 if (numBytes < visible) { 1410 throw new IOException(numBytes + " = numBytes < visible = " 1411 + visible + ", temp=" + temp); 1412 } 1413 // check volume 1414 final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume(); 1415 if (v == null) { 1416 throw new IOException("r.getVolume() = null, temp=" + temp); 1417 } 1418 1419 // move block files to the rbw directory 1420 BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId()); 1421 final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), 1422 bpslice.getRbwDir()); 1423 // create RBW 1424 final ReplicaBeingWritten rbw = new ReplicaBeingWritten( 1425 blockId, numBytes, expectedGs, 1426 v, dest.getParentFile(), Thread.currentThread(), 0); 1427 rbw.setBytesAcked(visible); 1428 // overwrite the RBW in the volume map 1429 volumeMap.add(b.getBlockPoolId(), rbw); 1430 return rbw; 1431 } 1432 1433 @Override // FsDatasetSpi createTemporary( StorageType storageType, ExtendedBlock b)1434 public ReplicaHandler createTemporary( 1435 StorageType storageType, ExtendedBlock b) throws IOException { 1436 long startTimeMs = Time.monotonicNow(); 1437 long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); 1438 ReplicaInfo lastFoundReplicaInfo = null; 1439 do { 1440 synchronized (this) { 1441 ReplicaInfo currentReplicaInfo = 1442 volumeMap.get(b.getBlockPoolId(), b.getBlockId()); 1443 if (currentReplicaInfo == lastFoundReplicaInfo) { 1444 if (lastFoundReplicaInfo != null) { 1445 invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }); 1446 } 1447 FsVolumeReference ref = 1448 volumes.getNextVolume(storageType, b.getNumBytes()); 1449 FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); 1450 // create a temporary file to hold block in the designated volume 1451 File f; 1452 try { 1453 f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); 1454 } catch (IOException e) { 1455 IOUtils.cleanup(null, ref); 1456 throw e; 1457 } 1458 ReplicaInPipeline newReplicaInfo = 1459 new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, 1460 f.getParentFile(), 0); 1461 volumeMap.add(b.getBlockPoolId(), newReplicaInfo); 1462 return new ReplicaHandler(newReplicaInfo, ref); 1463 } else { 1464 if (!(currentReplicaInfo.getGenerationStamp() < b 1465 .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) { 1466 throw new ReplicaAlreadyExistsException("Block " + b 1467 + " already exists in state " + currentReplicaInfo.getState() 1468 + " and thus cannot be created."); 1469 } 1470 lastFoundReplicaInfo = currentReplicaInfo; 1471 } 1472 } 1473 1474 // Hang too long, just bail out. This is not supposed to happen. 1475 long writerStopMs = Time.monotonicNow() - startTimeMs; 1476 if (writerStopMs > writerStopTimeoutMs) { 1477 LOG.warn("Unable to stop existing writer for block " + b + " after " 1478 + writerStopMs + " miniseconds."); 1479 throw new IOException("Unable to stop existing writer for block " + b 1480 + " after " + writerStopMs + " miniseconds."); 1481 } 1482 1483 // Stop the previous writer 1484 ((ReplicaInPipeline) lastFoundReplicaInfo) 1485 .stopWriter(writerStopTimeoutMs); 1486 } while (true); 1487 } 1488 1489 /** 1490 * Sets the offset in the meta file so that the 1491 * last checksum will be overwritten. 1492 */ 1493 @Override // FsDatasetSpi adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, int checksumSize)1494 public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, 1495 int checksumSize) throws IOException { 1496 FileOutputStream file = (FileOutputStream)streams.getChecksumOut(); 1497 FileChannel channel = file.getChannel(); 1498 long oldPos = channel.position(); 1499 long newPos = oldPos - checksumSize; 1500 if (LOG.isDebugEnabled()) { 1501 LOG.debug("Changing meta file offset of block " + b + " from " + 1502 oldPos + " to " + newPos); 1503 } 1504 channel.position(newPos); 1505 } 1506 1507 // 1508 // REMIND - mjc - eventually we should have a timeout system 1509 // in place to clean up block files left by abandoned clients. 1510 // We should have some timer in place, so that if a blockfile 1511 // is created but non-valid, and has been idle for >48 hours, 1512 // we can GC it safely. 1513 // 1514 1515 /** 1516 * Complete the block write! 1517 */ 1518 @Override // FsDatasetSpi finalizeBlock(ExtendedBlock b)1519 public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { 1520 if (Thread.interrupted()) { 1521 // Don't allow data modifications from interrupted threads 1522 throw new IOException("Cannot finalize block from Interrupted Thread"); 1523 } 1524 ReplicaInfo replicaInfo = getReplicaInfo(b); 1525 if (replicaInfo.getState() == ReplicaState.FINALIZED) { 1526 // this is legal, when recovery happens on a file that has 1527 // been opened for append but never modified 1528 return; 1529 } 1530 finalizeReplica(b.getBlockPoolId(), replicaInfo); 1531 } 1532 finalizeReplica(String bpid, ReplicaInfo replicaInfo)1533 private synchronized FinalizedReplica finalizeReplica(String bpid, 1534 ReplicaInfo replicaInfo) throws IOException { 1535 FinalizedReplica newReplicaInfo = null; 1536 if (replicaInfo.getState() == ReplicaState.RUR && 1537 ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() == 1538 ReplicaState.FINALIZED) { 1539 newReplicaInfo = (FinalizedReplica) 1540 ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica(); 1541 } else { 1542 FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); 1543 File f = replicaInfo.getBlockFile(); 1544 if (v == null) { 1545 throw new IOException("No volume for temporary file " + f + 1546 " for block " + replicaInfo); 1547 } 1548 1549 File dest = v.addFinalizedBlock( 1550 bpid, replicaInfo, f, replicaInfo.getBytesReserved()); 1551 newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); 1552 1553 if (v.isTransientStorage()) { 1554 ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); 1555 datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); 1556 } 1557 } 1558 volumeMap.add(bpid, newReplicaInfo); 1559 1560 return newReplicaInfo; 1561 } 1562 1563 /** 1564 * Remove the temporary block file (if any) 1565 */ 1566 @Override // FsDatasetSpi unfinalizeBlock(ExtendedBlock b)1567 public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { 1568 ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 1569 b.getLocalBlock()); 1570 if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { 1571 // remove from volumeMap 1572 volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); 1573 1574 // delete the on-disk temp file 1575 if (delBlockFromDisk(replicaInfo.getBlockFile(), 1576 replicaInfo.getMetaFile(), b.getLocalBlock())) { 1577 LOG.warn("Block " + b + " unfinalized and removed. " ); 1578 } 1579 if (replicaInfo.getVolume().isTransientStorage()) { 1580 ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true); 1581 } 1582 } 1583 } 1584 1585 /** 1586 * Remove a block from disk 1587 * @param blockFile block file 1588 * @param metaFile block meta file 1589 * @param b a block 1590 * @return true if on-disk files are deleted; false otherwise 1591 */ delBlockFromDisk(File blockFile, File metaFile, Block b)1592 private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) { 1593 if (blockFile == null) { 1594 LOG.warn("No file exists for block: " + b); 1595 return true; 1596 } 1597 1598 if (!blockFile.delete()) { 1599 LOG.warn("Not able to delete the block file: " + blockFile); 1600 return false; 1601 } else { // remove the meta file 1602 if (metaFile != null && !metaFile.delete()) { 1603 LOG.warn("Not able to delete the meta block file: " + metaFile); 1604 return false; 1605 } 1606 } 1607 return true; 1608 } 1609 1610 @Override getBlockReports(String bpid)1611 public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) { 1612 Map<DatanodeStorage, BlockListAsLongs> blockReportsMap = 1613 new HashMap<DatanodeStorage, BlockListAsLongs>(); 1614 1615 Map<String, BlockListAsLongs.Builder> builders = 1616 new HashMap<String, BlockListAsLongs.Builder>(); 1617 1618 List<FsVolumeImpl> curVolumes = getVolumes(); 1619 for (FsVolumeSpi v : curVolumes) { 1620 builders.put(v.getStorageID(), BlockListAsLongs.builder()); 1621 } 1622 1623 synchronized(this) { 1624 for (ReplicaInfo b : volumeMap.replicas(bpid)) { 1625 switch(b.getState()) { 1626 case FINALIZED: 1627 case RBW: 1628 case RWR: 1629 builders.get(b.getVolume().getStorageID()).add(b); 1630 break; 1631 case RUR: 1632 ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b; 1633 builders.get(rur.getVolume().getStorageID()) 1634 .add(rur.getOriginalReplica()); 1635 break; 1636 case TEMPORARY: 1637 break; 1638 default: 1639 assert false : "Illegal ReplicaInfo state."; 1640 } 1641 } 1642 } 1643 1644 for (FsVolumeImpl v : curVolumes) { 1645 blockReportsMap.put(v.toDatanodeStorage(), 1646 builders.get(v.getStorageID()).build()); 1647 } 1648 1649 return blockReportsMap; 1650 } 1651 1652 @Override // FsDatasetSpi getCacheReport(String bpid)1653 public List<Long> getCacheReport(String bpid) { 1654 return cacheManager.getCachedBlocks(bpid); 1655 } 1656 1657 /** 1658 * Get the list of finalized blocks from in-memory blockmap for a block pool. 1659 */ 1660 @Override getFinalizedBlocks(String bpid)1661 public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) { 1662 ArrayList<FinalizedReplica> finalized = 1663 new ArrayList<FinalizedReplica>(volumeMap.size(bpid)); 1664 for (ReplicaInfo b : volumeMap.replicas(bpid)) { 1665 if(b.getState() == ReplicaState.FINALIZED) { 1666 finalized.add(new FinalizedReplica((FinalizedReplica)b)); 1667 } 1668 } 1669 return finalized; 1670 } 1671 1672 /** 1673 * Get the list of finalized blocks from in-memory blockmap for a block pool. 1674 */ 1675 @Override getFinalizedBlocksOnPersistentStorage(String bpid)1676 public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) { 1677 ArrayList<FinalizedReplica> finalized = 1678 new ArrayList<FinalizedReplica>(volumeMap.size(bpid)); 1679 for (ReplicaInfo b : volumeMap.replicas(bpid)) { 1680 if(!b.getVolume().isTransientStorage() && 1681 b.getState() == ReplicaState.FINALIZED) { 1682 finalized.add(new FinalizedReplica((FinalizedReplica)b)); 1683 } 1684 } 1685 return finalized; 1686 } 1687 1688 /** 1689 * Check if a block is valid. 1690 * 1691 * @param b The block to check. 1692 * @param minLength The minimum length that the block must have. May be 0. 1693 * @param state If this is null, it is ignored. If it is non-null, we 1694 * will check that the replica has this state. 1695 * 1696 * @throws ReplicaNotFoundException If the replica is not found 1697 * 1698 * @throws UnexpectedReplicaStateException If the replica is not in the 1699 * expected state. 1700 * @throws FileNotFoundException If the block file is not found or there 1701 * was an error locating it. 1702 * @throws EOFException If the replica length is too short. 1703 * 1704 * @throws IOException May be thrown from the methods called. 1705 */ checkBlock(ExtendedBlock b, long minLength, ReplicaState state)1706 public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state) 1707 throws ReplicaNotFoundException, UnexpectedReplicaStateException, 1708 FileNotFoundException, EOFException, IOException { 1709 final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 1710 b.getLocalBlock()); 1711 if (replicaInfo == null) { 1712 throw new ReplicaNotFoundException(b); 1713 } 1714 if (replicaInfo.getState() != state) { 1715 throw new UnexpectedReplicaStateException(b,state); 1716 } 1717 if (!replicaInfo.getBlockFile().exists()) { 1718 throw new FileNotFoundException(replicaInfo.getBlockFile().getPath()); 1719 } 1720 long onDiskLength = getLength(b); 1721 if (onDiskLength < minLength) { 1722 throw new EOFException(b + "'s on-disk length " + onDiskLength 1723 + " is shorter than minLength " + minLength); 1724 } 1725 } 1726 1727 /** 1728 * Check whether the given block is a valid one. 1729 * valid means finalized 1730 */ 1731 @Override // FsDatasetSpi isValidBlock(ExtendedBlock b)1732 public boolean isValidBlock(ExtendedBlock b) { 1733 return isValid(b, ReplicaState.FINALIZED); 1734 } 1735 1736 /** 1737 * Check whether the given block is a valid RBW. 1738 */ 1739 @Override // {@link FsDatasetSpi} isValidRbw(final ExtendedBlock b)1740 public boolean isValidRbw(final ExtendedBlock b) { 1741 return isValid(b, ReplicaState.RBW); 1742 } 1743 1744 /** Does the block exist and have the given state? */ isValid(final ExtendedBlock b, final ReplicaState state)1745 private boolean isValid(final ExtendedBlock b, final ReplicaState state) { 1746 try { 1747 checkBlock(b, 0, state); 1748 } catch (IOException e) { 1749 return false; 1750 } 1751 return true; 1752 } 1753 1754 /** 1755 * Find the file corresponding to the block and return it if it exists. 1756 */ validateBlockFile(String bpid, long blockId)1757 File validateBlockFile(String bpid, long blockId) { 1758 //Should we check for metadata file too? 1759 final File f; 1760 synchronized(this) { 1761 f = getFile(bpid, blockId, false); 1762 } 1763 1764 if(f != null ) { 1765 if(f.exists()) 1766 return f; 1767 1768 // if file is not null, but doesn't exist - possibly disk failed 1769 datanode.checkDiskErrorAsync(); 1770 } 1771 1772 if (LOG.isDebugEnabled()) { 1773 LOG.debug("blockId=" + blockId + ", f=" + f); 1774 } 1775 return null; 1776 } 1777 1778 /** Check the files of a replica. */ checkReplicaFiles(final ReplicaInfo r)1779 static void checkReplicaFiles(final ReplicaInfo r) throws IOException { 1780 //check replica's file 1781 final File f = r.getBlockFile(); 1782 if (!f.exists()) { 1783 throw new FileNotFoundException("File " + f + " not found, r=" + r); 1784 } 1785 if (r.getBytesOnDisk() != f.length()) { 1786 throw new IOException("File length mismatched. The length of " 1787 + f + " is " + f.length() + " but r=" + r); 1788 } 1789 1790 //check replica's meta file 1791 final File metafile = FsDatasetUtil.getMetaFile(f, r.getGenerationStamp()); 1792 if (!metafile.exists()) { 1793 throw new IOException("Metafile " + metafile + " does not exist, r=" + r); 1794 } 1795 if (metafile.length() == 0) { 1796 throw new IOException("Metafile " + metafile + " is empty, r=" + r); 1797 } 1798 } 1799 1800 /** 1801 * We're informed that a block is no longer valid. We 1802 * could lazily garbage-collect the block, but why bother? 1803 * just get rid of it. 1804 */ 1805 @Override // FsDatasetSpi invalidate(String bpid, Block invalidBlks[])1806 public void invalidate(String bpid, Block invalidBlks[]) throws IOException { 1807 final List<String> errors = new ArrayList<String>(); 1808 for (int i = 0; i < invalidBlks.length; i++) { 1809 final File f; 1810 final FsVolumeImpl v; 1811 synchronized (this) { 1812 final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); 1813 if (info == null) { 1814 // It is okay if the block is not found -- it may be deleted earlier. 1815 LOG.info("Failed to delete replica " + invalidBlks[i] 1816 + ": ReplicaInfo not found."); 1817 continue; 1818 } 1819 if (info.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) { 1820 errors.add("Failed to delete replica " + invalidBlks[i] 1821 + ": GenerationStamp not matched, info=" + info); 1822 continue; 1823 } 1824 f = info.getBlockFile(); 1825 v = (FsVolumeImpl)info.getVolume(); 1826 if (v == null) { 1827 errors.add("Failed to delete replica " + invalidBlks[i] 1828 + ". No volume for this replica, file=" + f); 1829 continue; 1830 } 1831 File parent = f.getParentFile(); 1832 if (parent == null) { 1833 errors.add("Failed to delete replica " + invalidBlks[i] 1834 + ". Parent not found for file " + f); 1835 continue; 1836 } 1837 ReplicaInfo removing = volumeMap.remove(bpid, invalidBlks[i]); 1838 addDeletingBlock(bpid, removing.getBlockId()); 1839 if (LOG.isDebugEnabled()) { 1840 LOG.debug("Block file " + removing.getBlockFile().getName() 1841 + " is to be deleted"); 1842 } 1843 } 1844 1845 if (v.isTransientStorage()) { 1846 RamDiskReplica replicaInfo = 1847 ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId()); 1848 if (replicaInfo != null) { 1849 if (!replicaInfo.getIsPersisted()) { 1850 datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted(); 1851 } 1852 ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(), 1853 replicaInfo.getBlockId(), true); 1854 } 1855 } 1856 1857 // If a DFSClient has the replica in its cache of short-circuit file 1858 // descriptors (and the client is using ShortCircuitShm), invalidate it. 1859 datanode.getShortCircuitRegistry().processBlockInvalidation( 1860 new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid)); 1861 1862 // If the block is cached, start uncaching it. 1863 cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); 1864 1865 // Delete the block asynchronously to make sure we can do it fast enough. 1866 // It's ok to unlink the block file before the uncache operation 1867 // finishes. 1868 try { 1869 asyncDiskService.deleteAsync(v.obtainReference(), f, 1870 FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()), 1871 new ExtendedBlock(bpid, invalidBlks[i]), 1872 dataStorage.getTrashDirectoryForBlockFile(bpid, f)); 1873 } catch (ClosedChannelException e) { 1874 LOG.warn("Volume " + v + " is closed, ignore the deletion task for " + 1875 "block " + invalidBlks[i]); 1876 } 1877 } 1878 if (!errors.isEmpty()) { 1879 StringBuilder b = new StringBuilder("Failed to delete ") 1880 .append(errors.size()).append(" (out of ").append(invalidBlks.length) 1881 .append(") replica(s):"); 1882 for(int i = 0; i < errors.size(); i++) { 1883 b.append("\n").append(i).append(") ").append(errors.get(i)); 1884 } 1885 throw new IOException(b.toString()); 1886 } 1887 } 1888 1889 /** 1890 * Invalidate a block but does not delete the actual on-disk block file. 1891 * 1892 * It should only be used when deactivating disks. 1893 * 1894 * @param bpid the block pool ID. 1895 * @param block The block to be invalidated. 1896 */ invalidate(String bpid, ReplicaInfo block)1897 public void invalidate(String bpid, ReplicaInfo block) { 1898 // If a DFSClient has the replica in its cache of short-circuit file 1899 // descriptors (and the client is using ShortCircuitShm), invalidate it. 1900 datanode.getShortCircuitRegistry().processBlockInvalidation( 1901 new ExtendedBlockId(block.getBlockId(), bpid)); 1902 1903 // If the block is cached, start uncaching it. 1904 cacheManager.uncacheBlock(bpid, block.getBlockId()); 1905 1906 datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block), 1907 block.getStorageUuid()); 1908 } 1909 1910 /** 1911 * Asynchronously attempts to cache a single block via {@link FsDatasetCache}. 1912 */ cacheBlock(String bpid, long blockId)1913 private void cacheBlock(String bpid, long blockId) { 1914 FsVolumeImpl volume; 1915 String blockFileName; 1916 long length, genstamp; 1917 Executor volumeExecutor; 1918 1919 synchronized (this) { 1920 ReplicaInfo info = volumeMap.get(bpid, blockId); 1921 boolean success = false; 1922 try { 1923 if (info == null) { 1924 LOG.warn("Failed to cache block with id " + blockId + ", pool " + 1925 bpid + ": ReplicaInfo not found."); 1926 return; 1927 } 1928 if (info.getState() != ReplicaState.FINALIZED) { 1929 LOG.warn("Failed to cache block with id " + blockId + ", pool " + 1930 bpid + ": replica is not finalized; it is in state " + 1931 info.getState()); 1932 return; 1933 } 1934 try { 1935 volume = (FsVolumeImpl)info.getVolume(); 1936 if (volume == null) { 1937 LOG.warn("Failed to cache block with id " + blockId + ", pool " + 1938 bpid + ": volume not found."); 1939 return; 1940 } 1941 } catch (ClassCastException e) { 1942 LOG.warn("Failed to cache block with id " + blockId + 1943 ": volume was not an instance of FsVolumeImpl."); 1944 return; 1945 } 1946 if (volume.isTransientStorage()) { 1947 LOG.warn("Caching not supported on block with id " + blockId + 1948 " since the volume is backed by RAM."); 1949 return; 1950 } 1951 success = true; 1952 } finally { 1953 if (!success) { 1954 cacheManager.numBlocksFailedToCache.incrementAndGet(); 1955 } 1956 } 1957 blockFileName = info.getBlockFile().getAbsolutePath(); 1958 length = info.getVisibleLength(); 1959 genstamp = info.getGenerationStamp(); 1960 volumeExecutor = volume.getCacheExecutor(); 1961 } 1962 cacheManager.cacheBlock(blockId, bpid, 1963 blockFileName, length, genstamp, volumeExecutor); 1964 } 1965 1966 @Override // FsDatasetSpi cache(String bpid, long[] blockIds)1967 public void cache(String bpid, long[] blockIds) { 1968 for (int i=0; i < blockIds.length; i++) { 1969 cacheBlock(bpid, blockIds[i]); 1970 } 1971 } 1972 1973 @Override // FsDatasetSpi uncache(String bpid, long[] blockIds)1974 public void uncache(String bpid, long[] blockIds) { 1975 for (int i=0; i < blockIds.length; i++) { 1976 cacheManager.uncacheBlock(bpid, blockIds[i]); 1977 } 1978 } 1979 1980 @Override isCached(String bpid, long blockId)1981 public boolean isCached(String bpid, long blockId) { 1982 return cacheManager.isCached(bpid, blockId); 1983 } 1984 1985 @Override // FsDatasetSpi contains(final ExtendedBlock block)1986 public synchronized boolean contains(final ExtendedBlock block) { 1987 final long blockId = block.getLocalBlock().getBlockId(); 1988 return getFile(block.getBlockPoolId(), blockId, false) != null; 1989 } 1990 1991 /** 1992 * Turn the block identifier into a filename 1993 * @param bpid Block pool Id 1994 * @param blockId a block's id 1995 * @return on disk data file path; null if the replica does not exist 1996 */ getFile(final String bpid, final long blockId, boolean touch)1997 File getFile(final String bpid, final long blockId, boolean touch) { 1998 ReplicaInfo info = volumeMap.get(bpid, blockId); 1999 if (info != null) { 2000 if (touch && info.getVolume().isTransientStorage()) { 2001 ramDiskReplicaTracker.touch(bpid, blockId); 2002 datanode.getMetrics().incrRamDiskBlocksReadHits(); 2003 } 2004 return info.getBlockFile(); 2005 } 2006 return null; 2007 } 2008 2009 /** 2010 * check if a data directory is healthy 2011 * 2012 * if some volumes failed - the caller must emove all the blocks that belong 2013 * to these failed volumes. 2014 * @return the failed volumes. Returns null if no volume failed. 2015 */ 2016 @Override // FsDatasetSpi checkDataDir()2017 public Set<File> checkDataDir() { 2018 return volumes.checkDirs(); 2019 } 2020 2021 2022 @Override // FsDatasetSpi toString()2023 public String toString() { 2024 return "FSDataset{dirpath='"+volumes+"'}"; 2025 } 2026 2027 private ObjectName mbeanName; 2028 2029 /** 2030 * Register the FSDataset MBean using the name 2031 * "hadoop:service=DataNode,name=FSDatasetState-<datanodeUuid>" 2032 */ registerMBean(final String datanodeUuid)2033 void registerMBean(final String datanodeUuid) { 2034 // We wrap to bypass standard mbean naming convetion. 2035 // This wraping can be removed in java 6 as it is more flexible in 2036 // package naming for mbeans and their impl. 2037 try { 2038 StandardMBean bean = new StandardMBean(this,FSDatasetMBean.class); 2039 mbeanName = MBeans.register("DataNode", "FSDatasetState-" + datanodeUuid, bean); 2040 } catch (NotCompliantMBeanException e) { 2041 LOG.warn("Error registering FSDatasetState MBean", e); 2042 } 2043 LOG.info("Registered FSDatasetState MBean"); 2044 } 2045 2046 @Override // FsDatasetSpi shutdown()2047 public void shutdown() { 2048 fsRunning = false; 2049 2050 ((LazyWriter) lazyWriter.getRunnable()).stop(); 2051 lazyWriter.interrupt(); 2052 2053 if (mbeanName != null) { 2054 MBeans.unregister(mbeanName); 2055 } 2056 2057 if (asyncDiskService != null) { 2058 asyncDiskService.shutdown(); 2059 } 2060 2061 if (asyncLazyPersistService != null) { 2062 asyncLazyPersistService.shutdown(); 2063 } 2064 2065 if(volumes != null) { 2066 volumes.shutdown(); 2067 } 2068 2069 try { 2070 lazyWriter.join(); 2071 } catch (InterruptedException ie) { 2072 LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " + 2073 "from LazyWriter.join"); 2074 } 2075 } 2076 2077 @Override // FSDatasetMBean getStorageInfo()2078 public String getStorageInfo() { 2079 return toString(); 2080 } 2081 2082 /** 2083 * Reconcile the difference between blocks on the disk and blocks in 2084 * volumeMap 2085 * 2086 * Check the given block for inconsistencies. Look at the 2087 * current state of the block and reconcile the differences as follows: 2088 * <ul> 2089 * <li>If the block file is missing, delete the block from volumeMap</li> 2090 * <li>If the block file exists and the block is missing in volumeMap, 2091 * add the block to volumeMap <li> 2092 * <li>If generation stamp does not match, then update the block with right 2093 * generation stamp</li> 2094 * <li>If the block length in memory does not match the actual block file length 2095 * then mark the block as corrupt and update the block length in memory</li> 2096 * <li>If the file in {@link ReplicaInfo} does not match the file on 2097 * the disk, update {@link ReplicaInfo} with the correct file</li> 2098 * </ul> 2099 * 2100 * @param blockId Block that differs 2101 * @param diskFile Block file on the disk 2102 * @param diskMetaFile Metadata file from on the disk 2103 * @param vol Volume of the block file 2104 */ 2105 @Override checkAndUpdate(String bpid, long blockId, File diskFile, File diskMetaFile, FsVolumeSpi vol)2106 public void checkAndUpdate(String bpid, long blockId, File diskFile, 2107 File diskMetaFile, FsVolumeSpi vol) throws IOException { 2108 Block corruptBlock = null; 2109 ReplicaInfo memBlockInfo; 2110 synchronized (this) { 2111 memBlockInfo = volumeMap.get(bpid, blockId); 2112 if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { 2113 // Block is not finalized - ignore the difference 2114 return; 2115 } 2116 2117 final long diskGS = diskMetaFile != null && diskMetaFile.exists() ? 2118 Block.getGenerationStamp(diskMetaFile.getName()) : 2119 GenerationStamp.GRANDFATHER_GENERATION_STAMP; 2120 2121 if (diskFile == null || !diskFile.exists()) { 2122 if (memBlockInfo == null) { 2123 // Block file does not exist and block does not exist in memory 2124 // If metadata file exists then delete it 2125 if (diskMetaFile != null && diskMetaFile.exists() 2126 && diskMetaFile.delete()) { 2127 LOG.warn("Deleted a metadata file without a block " 2128 + diskMetaFile.getAbsolutePath()); 2129 } 2130 return; 2131 } 2132 if (!memBlockInfo.getBlockFile().exists()) { 2133 // Block is in memory and not on the disk 2134 // Remove the block from volumeMap 2135 volumeMap.remove(bpid, blockId); 2136 if (vol.isTransientStorage()) { 2137 ramDiskReplicaTracker.discardReplica(bpid, blockId, true); 2138 } 2139 LOG.warn("Removed block " + blockId 2140 + " from memory with missing block file on the disk"); 2141 // Finally remove the metadata file 2142 if (diskMetaFile != null && diskMetaFile.exists() 2143 && diskMetaFile.delete()) { 2144 LOG.warn("Deleted a metadata file for the deleted block " 2145 + diskMetaFile.getAbsolutePath()); 2146 } 2147 } 2148 return; 2149 } 2150 /* 2151 * Block file exists on the disk 2152 */ 2153 if (memBlockInfo == null) { 2154 // Block is missing in memory - add the block to volumeMap 2155 ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 2156 diskFile.length(), diskGS, vol, diskFile.getParentFile()); 2157 volumeMap.add(bpid, diskBlockInfo); 2158 if (vol.isTransientStorage()) { 2159 ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); 2160 } 2161 LOG.warn("Added missing block to memory " + diskBlockInfo); 2162 return; 2163 } 2164 /* 2165 * Block exists in volumeMap and the block file exists on the disk 2166 */ 2167 // Compare block files 2168 File memFile = memBlockInfo.getBlockFile(); 2169 if (memFile.exists()) { 2170 if (memFile.compareTo(diskFile) != 0) { 2171 if (diskMetaFile.exists()) { 2172 if (memBlockInfo.getMetaFile().exists()) { 2173 // We have two sets of block+meta files. Decide which one to 2174 // keep. 2175 ReplicaInfo diskBlockInfo = new FinalizedReplica( 2176 blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile()); 2177 ((FsVolumeImpl) vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas( 2178 memBlockInfo, diskBlockInfo, volumeMap); 2179 } 2180 } else { 2181 if (!diskFile.delete()) { 2182 LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan"); 2183 } 2184 } 2185 } 2186 } else { 2187 // Block refers to a block file that does not exist. 2188 // Update the block with the file found on the disk. Since the block 2189 // file and metadata file are found as a pair on the disk, update 2190 // the block based on the metadata file found on the disk 2191 LOG.warn("Block file in volumeMap " 2192 + memFile.getAbsolutePath() 2193 + " does not exist. Updating it to the file found during scan " 2194 + diskFile.getAbsolutePath()); 2195 memBlockInfo.setDir(diskFile.getParentFile()); 2196 memFile = diskFile; 2197 2198 LOG.warn("Updating generation stamp for block " + blockId 2199 + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS); 2200 memBlockInfo.setGenerationStamp(diskGS); 2201 } 2202 2203 // Compare generation stamp 2204 if (memBlockInfo.getGenerationStamp() != diskGS) { 2205 File memMetaFile = FsDatasetUtil.getMetaFile(diskFile, 2206 memBlockInfo.getGenerationStamp()); 2207 if (memMetaFile.exists()) { 2208 if (memMetaFile.compareTo(diskMetaFile) != 0) { 2209 LOG.warn("Metadata file in memory " 2210 + memMetaFile.getAbsolutePath() 2211 + " does not match file found by scan " 2212 + (diskMetaFile == null? null: diskMetaFile.getAbsolutePath())); 2213 } 2214 } else { 2215 // Metadata file corresponding to block in memory is missing 2216 // If metadata file found during the scan is on the same directory 2217 // as the block file, then use the generation stamp from it 2218 long gs = diskMetaFile != null && diskMetaFile.exists() 2219 && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS 2220 : GenerationStamp.GRANDFATHER_GENERATION_STAMP; 2221 2222 LOG.warn("Updating generation stamp for block " + blockId 2223 + " from " + memBlockInfo.getGenerationStamp() + " to " + gs); 2224 2225 memBlockInfo.setGenerationStamp(gs); 2226 } 2227 } 2228 2229 // Compare block size 2230 if (memBlockInfo.getNumBytes() != memFile.length()) { 2231 // Update the length based on the block file 2232 corruptBlock = new Block(memBlockInfo); 2233 LOG.warn("Updating size of block " + blockId + " from " 2234 + memBlockInfo.getNumBytes() + " to " + memFile.length()); 2235 memBlockInfo.setNumBytes(memFile.length()); 2236 } 2237 } 2238 2239 // Send corrupt block report outside the lock 2240 if (corruptBlock != null) { 2241 LOG.warn("Reporting the block " + corruptBlock 2242 + " as corrupt due to length mismatch"); 2243 try { 2244 datanode.reportBadBlocks(new ExtendedBlock(bpid, corruptBlock)); 2245 } catch (IOException e) { 2246 LOG.warn("Failed to repot bad block " + corruptBlock, e); 2247 } 2248 } 2249 } 2250 2251 /** 2252 * @deprecated use {@link #fetchReplicaInfo(String, long)} instead. 2253 */ 2254 @Override // FsDatasetSpi 2255 @Deprecated getReplica(String bpid, long blockId)2256 public ReplicaInfo getReplica(String bpid, long blockId) { 2257 return volumeMap.get(bpid, blockId); 2258 } 2259 2260 @Override getReplicaString(String bpid, long blockId)2261 public synchronized String getReplicaString(String bpid, long blockId) { 2262 final Replica r = volumeMap.get(bpid, blockId); 2263 return r == null? "null": r.toString(); 2264 } 2265 2266 @Override // FsDatasetSpi initReplicaRecovery( RecoveringBlock rBlock)2267 public synchronized ReplicaRecoveryInfo initReplicaRecovery( 2268 RecoveringBlock rBlock) throws IOException { 2269 return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap, 2270 rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(), 2271 datanode.getDnConf().getXceiverStopTimeout()); 2272 } 2273 2274 /** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */ initReplicaRecovery(String bpid, ReplicaMap map, Block block, long recoveryId, long xceiverStopTimeout)2275 static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, 2276 Block block, long recoveryId, long xceiverStopTimeout) throws IOException { 2277 final ReplicaInfo replica = map.get(bpid, block.getBlockId()); 2278 LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId 2279 + ", replica=" + replica); 2280 2281 //check replica 2282 if (replica == null) { 2283 return null; 2284 } 2285 2286 //stop writer if there is any 2287 if (replica instanceof ReplicaInPipeline) { 2288 final ReplicaInPipeline rip = (ReplicaInPipeline)replica; 2289 rip.stopWriter(xceiverStopTimeout); 2290 2291 //check replica bytes on disk. 2292 if (rip.getBytesOnDisk() < rip.getVisibleLength()) { 2293 throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" 2294 + " getBytesOnDisk() < getVisibleLength(), rip=" + rip); 2295 } 2296 2297 //check the replica's files 2298 checkReplicaFiles(rip); 2299 } 2300 2301 //check generation stamp 2302 if (replica.getGenerationStamp() < block.getGenerationStamp()) { 2303 throw new IOException( 2304 "replica.getGenerationStamp() < block.getGenerationStamp(), block=" 2305 + block + ", replica=" + replica); 2306 } 2307 2308 //check recovery id 2309 if (replica.getGenerationStamp() >= recoveryId) { 2310 throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" 2311 + " replica.getGenerationStamp() >= recoveryId = " + recoveryId 2312 + ", block=" + block + ", replica=" + replica); 2313 } 2314 2315 //check RUR 2316 final ReplicaUnderRecovery rur; 2317 if (replica.getState() == ReplicaState.RUR) { 2318 rur = (ReplicaUnderRecovery)replica; 2319 if (rur.getRecoveryID() >= recoveryId) { 2320 throw new RecoveryInProgressException( 2321 "rur.getRecoveryID() >= recoveryId = " + recoveryId 2322 + ", block=" + block + ", rur=" + rur); 2323 } 2324 final long oldRecoveryID = rur.getRecoveryID(); 2325 rur.setRecoveryID(recoveryId); 2326 LOG.info("initReplicaRecovery: update recovery id for " + block 2327 + " from " + oldRecoveryID + " to " + recoveryId); 2328 } 2329 else { 2330 rur = new ReplicaUnderRecovery(replica, recoveryId); 2331 map.add(bpid, rur); 2332 LOG.info("initReplicaRecovery: changing replica state for " 2333 + block + " from " + replica.getState() 2334 + " to " + rur.getState()); 2335 } 2336 return rur.createInfo(); 2337 } 2338 2339 @Override // FsDatasetSpi updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, final long newBlockId, final long newlength)2340 public synchronized String updateReplicaUnderRecovery( 2341 final ExtendedBlock oldBlock, 2342 final long recoveryId, 2343 final long newBlockId, 2344 final long newlength) throws IOException { 2345 //get replica 2346 final String bpid = oldBlock.getBlockPoolId(); 2347 final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); 2348 LOG.info("updateReplica: " + oldBlock 2349 + ", recoveryId=" + recoveryId 2350 + ", length=" + newlength 2351 + ", replica=" + replica); 2352 2353 //check replica 2354 if (replica == null) { 2355 throw new ReplicaNotFoundException(oldBlock); 2356 } 2357 2358 //check replica state 2359 if (replica.getState() != ReplicaState.RUR) { 2360 throw new IOException("replica.getState() != " + ReplicaState.RUR 2361 + ", replica=" + replica); 2362 } 2363 2364 //check replica's byte on disk 2365 if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) { 2366 throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" 2367 + " replica.getBytesOnDisk() != block.getNumBytes(), block=" 2368 + oldBlock + ", replica=" + replica); 2369 } 2370 2371 //check replica files before update 2372 checkReplicaFiles(replica); 2373 2374 //update replica 2375 final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock 2376 .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, 2377 newBlockId, newlength); 2378 2379 boolean copyTruncate = newBlockId != oldBlock.getBlockId(); 2380 if(!copyTruncate) { 2381 assert finalized.getBlockId() == oldBlock.getBlockId() 2382 && finalized.getGenerationStamp() == recoveryId 2383 && finalized.getNumBytes() == newlength 2384 : "Replica information mismatched: oldBlock=" + oldBlock 2385 + ", recoveryId=" + recoveryId + ", newlength=" + newlength 2386 + ", newBlockId=" + newBlockId + ", finalized=" + finalized; 2387 } else { 2388 assert finalized.getBlockId() == oldBlock.getBlockId() 2389 && finalized.getGenerationStamp() == oldBlock.getGenerationStamp() 2390 && finalized.getNumBytes() == oldBlock.getNumBytes() 2391 : "Finalized and old information mismatched: oldBlock=" + oldBlock 2392 + ", genStamp=" + oldBlock.getGenerationStamp() 2393 + ", len=" + oldBlock.getNumBytes() 2394 + ", finalized=" + finalized; 2395 } 2396 2397 //check replica files after update 2398 checkReplicaFiles(finalized); 2399 2400 //return storage ID 2401 return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID(); 2402 } 2403 updateReplicaUnderRecovery( String bpid, ReplicaUnderRecovery rur, long recoveryId, long newBlockId, long newlength)2404 private FinalizedReplica updateReplicaUnderRecovery( 2405 String bpid, 2406 ReplicaUnderRecovery rur, 2407 long recoveryId, 2408 long newBlockId, 2409 long newlength) throws IOException { 2410 //check recovery id 2411 if (rur.getRecoveryID() != recoveryId) { 2412 throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId 2413 + ", rur=" + rur); 2414 } 2415 2416 boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId; 2417 File blockFile; 2418 File metaFile; 2419 // bump rur's GS to be recovery id 2420 if(!copyOnTruncate) { 2421 bumpReplicaGS(rur, recoveryId); 2422 blockFile = rur.getBlockFile(); 2423 metaFile = rur.getMetaFile(); 2424 } else { 2425 File[] copiedReplicaFiles = 2426 copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId); 2427 blockFile = copiedReplicaFiles[1]; 2428 metaFile = copiedReplicaFiles[0]; 2429 } 2430 2431 //update length 2432 if (rur.getNumBytes() < newlength) { 2433 throw new IOException("rur.getNumBytes() < newlength = " + newlength 2434 + ", rur=" + rur); 2435 } 2436 if (rur.getNumBytes() > newlength) { 2437 rur.unlinkBlock(1); 2438 truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength); 2439 if(!copyOnTruncate) { 2440 // update RUR with the new length 2441 rur.setNumBytes(newlength); 2442 } else { 2443 // Copying block to a new block with new blockId. 2444 // Not truncating original block. 2445 ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( 2446 newBlockId, recoveryId, rur.getVolume(), blockFile.getParentFile(), 2447 newlength); 2448 newReplicaInfo.setNumBytes(newlength); 2449 volumeMap.add(bpid, newReplicaInfo); 2450 finalizeReplica(bpid, newReplicaInfo); 2451 } 2452 } 2453 2454 // finalize the block 2455 return finalizeReplica(bpid, rur); 2456 } 2457 copyReplicaWithNewBlockIdAndGS( ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS)2458 private File[] copyReplicaWithNewBlockIdAndGS( 2459 ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS) 2460 throws IOException { 2461 String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId; 2462 FsVolumeReference v = volumes.getNextVolume( 2463 replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes()); 2464 final File tmpDir = ((FsVolumeImpl) v.getVolume()) 2465 .getBlockPoolSlice(bpid).getTmpDir(); 2466 final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId); 2467 final File dstBlockFile = new File(destDir, blockFileName); 2468 final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS); 2469 return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), 2470 dstMetaFile, dstBlockFile, true); 2471 } 2472 2473 @Override // FsDatasetSpi getReplicaVisibleLength(final ExtendedBlock block)2474 public synchronized long getReplicaVisibleLength(final ExtendedBlock block) 2475 throws IOException { 2476 final Replica replica = getReplicaInfo(block.getBlockPoolId(), 2477 block.getBlockId()); 2478 if (replica.getGenerationStamp() < block.getGenerationStamp()) { 2479 throw new IOException( 2480 "replica.getGenerationStamp() < block.getGenerationStamp(), block=" 2481 + block + ", replica=" + replica); 2482 } 2483 return replica.getVisibleLength(); 2484 } 2485 2486 @Override addBlockPool(String bpid, Configuration conf)2487 public void addBlockPool(String bpid, Configuration conf) 2488 throws IOException { 2489 LOG.info("Adding block pool " + bpid); 2490 synchronized(this) { 2491 volumes.addBlockPool(bpid, conf); 2492 volumeMap.initBlockPool(bpid); 2493 } 2494 volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker); 2495 } 2496 2497 @Override shutdownBlockPool(String bpid)2498 public synchronized void shutdownBlockPool(String bpid) { 2499 LOG.info("Removing block pool " + bpid); 2500 volumeMap.cleanUpBlockPool(bpid); 2501 volumes.removeBlockPool(bpid); 2502 } 2503 2504 /** 2505 * Class for representing the Datanode volume information 2506 */ 2507 private static class VolumeInfo { 2508 final String directory; 2509 final long usedSpace; // size of space used by HDFS 2510 final long freeSpace; // size of free space excluding reserved space 2511 final long reservedSpace; // size of space reserved for non-HDFS and RBW 2512 VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace)2513 VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) { 2514 this.directory = v.toString(); 2515 this.usedSpace = usedSpace; 2516 this.freeSpace = freeSpace; 2517 this.reservedSpace = v.getReserved(); 2518 } 2519 } 2520 getVolumeInfo()2521 private Collection<VolumeInfo> getVolumeInfo() { 2522 Collection<VolumeInfo> info = new ArrayList<VolumeInfo>(); 2523 for (FsVolumeImpl volume : getVolumes()) { 2524 long used = 0; 2525 long free = 0; 2526 try (FsVolumeReference ref = volume.obtainReference()) { 2527 used = volume.getDfsUsed(); 2528 free = volume.getAvailable(); 2529 } catch (ClosedChannelException e) { 2530 continue; 2531 } catch (IOException e) { 2532 LOG.warn(e.getMessage()); 2533 used = 0; 2534 free = 0; 2535 } 2536 2537 info.add(new VolumeInfo(volume, used, free)); 2538 } 2539 return info; 2540 } 2541 2542 @Override getVolumeInfoMap()2543 public Map<String, Object> getVolumeInfoMap() { 2544 final Map<String, Object> info = new HashMap<String, Object>(); 2545 Collection<VolumeInfo> volumes = getVolumeInfo(); 2546 for (VolumeInfo v : volumes) { 2547 final Map<String, Object> innerInfo = new HashMap<String, Object>(); 2548 innerInfo.put("usedSpace", v.usedSpace); 2549 innerInfo.put("freeSpace", v.freeSpace); 2550 innerInfo.put("reservedSpace", v.reservedSpace); 2551 info.put(v.directory, innerInfo); 2552 } 2553 return info; 2554 } 2555 2556 @Override //FsDatasetSpi deleteBlockPool(String bpid, boolean force)2557 public synchronized void deleteBlockPool(String bpid, boolean force) 2558 throws IOException { 2559 List<FsVolumeImpl> curVolumes = getVolumes(); 2560 if (!force) { 2561 for (FsVolumeImpl volume : curVolumes) { 2562 try (FsVolumeReference ref = volume.obtainReference()) { 2563 if (!volume.isBPDirEmpty(bpid)) { 2564 LOG.warn(bpid + " has some block files, cannot delete unless forced"); 2565 throw new IOException("Cannot delete block pool, " 2566 + "it contains some block files"); 2567 } 2568 } catch (ClosedChannelException e) { 2569 // ignore. 2570 } 2571 } 2572 } 2573 for (FsVolumeImpl volume : curVolumes) { 2574 try (FsVolumeReference ref = volume.obtainReference()) { 2575 volume.deleteBPDirectories(bpid, force); 2576 } catch (ClosedChannelException e) { 2577 // ignore. 2578 } 2579 } 2580 } 2581 2582 @Override // FsDatasetSpi getBlockLocalPathInfo(ExtendedBlock block)2583 public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) 2584 throws IOException { 2585 synchronized(this) { 2586 final Replica replica = volumeMap.get(block.getBlockPoolId(), 2587 block.getBlockId()); 2588 if (replica == null) { 2589 throw new ReplicaNotFoundException(block); 2590 } 2591 if (replica.getGenerationStamp() < block.getGenerationStamp()) { 2592 throw new IOException( 2593 "Replica generation stamp < block generation stamp, block=" 2594 + block + ", replica=" + replica); 2595 } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { 2596 block.setGenerationStamp(replica.getGenerationStamp()); 2597 } 2598 } 2599 2600 File datafile = getBlockFile(block); 2601 File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp()); 2602 BlockLocalPathInfo info = new BlockLocalPathInfo(block, 2603 datafile.getAbsolutePath(), metafile.getAbsolutePath()); 2604 return info; 2605 } 2606 2607 @Override // FsDatasetSpi getHdfsBlocksMetadata(String poolId, long[] blockIds)2608 public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId, 2609 long[] blockIds) throws IOException { 2610 List<FsVolumeImpl> curVolumes = getVolumes(); 2611 // List of VolumeIds, one per volume on the datanode 2612 List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size()); 2613 // List of indexes into the list of VolumeIds, pointing at the VolumeId of 2614 // the volume that the block is on 2615 List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length); 2616 // Initialize the list of VolumeIds simply by enumerating the volumes 2617 for (int i = 0; i < curVolumes.size(); i++) { 2618 blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array()); 2619 } 2620 // Determine the index of the VolumeId of each block's volume, by comparing 2621 // the block's volume against the enumerated volumes 2622 for (int i = 0; i < blockIds.length; i++) { 2623 long blockId = blockIds[i]; 2624 boolean isValid = false; 2625 2626 ReplicaInfo info = volumeMap.get(poolId, blockId); 2627 int volumeIndex = 0; 2628 if (info != null) { 2629 FsVolumeSpi blockVolume = info.getVolume(); 2630 for (FsVolumeImpl volume : curVolumes) { 2631 // This comparison of references should be safe 2632 if (blockVolume == volume) { 2633 isValid = true; 2634 break; 2635 } 2636 volumeIndex++; 2637 } 2638 } 2639 // Indicates that the block is not present, or not found in a data dir 2640 if (!isValid) { 2641 volumeIndex = Integer.MAX_VALUE; 2642 } 2643 blocksVolumeIndexes.add(volumeIndex); 2644 } 2645 return new HdfsBlocksMetadata(poolId, blockIds, 2646 blocksVolumeIds, blocksVolumeIndexes); 2647 } 2648 2649 @Override enableTrash(String bpid)2650 public void enableTrash(String bpid) { 2651 dataStorage.enableTrash(bpid); 2652 } 2653 2654 @Override clearTrash(String bpid)2655 public void clearTrash(String bpid) { 2656 dataStorage.clearTrash(bpid); 2657 } 2658 2659 @Override trashEnabled(String bpid)2660 public boolean trashEnabled(String bpid) { 2661 return dataStorage.trashEnabled(bpid); 2662 } 2663 2664 @Override setRollingUpgradeMarker(String bpid)2665 public void setRollingUpgradeMarker(String bpid) throws IOException { 2666 dataStorage.setRollingUpgradeMarker(bpid); 2667 } 2668 2669 @Override clearRollingUpgradeMarker(String bpid)2670 public void clearRollingUpgradeMarker(String bpid) throws IOException { 2671 dataStorage.clearRollingUpgradeMarker(bpid); 2672 } 2673 2674 2675 @Override onCompleteLazyPersist(String bpId, long blockId, long creationTime, File[] savedFiles, FsVolumeImpl targetVolume)2676 public void onCompleteLazyPersist(String bpId, long blockId, 2677 long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { 2678 synchronized (FsDatasetImpl.this) { 2679 ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles); 2680 2681 targetVolume.incDfsUsed(bpId, 2682 savedFiles[0].length() + savedFiles[1].length()); 2683 2684 // Update metrics (ignore the metadata file size) 2685 datanode.getMetrics().incrRamDiskBlocksLazyPersisted(); 2686 datanode.getMetrics().incrRamDiskBytesLazyPersisted(savedFiles[1].length()); 2687 datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs( 2688 Time.monotonicNow() - creationTime); 2689 2690 if (LOG.isDebugEnabled()) { 2691 LOG.debug("LazyWriter: Finish persisting RamDisk block: " 2692 + " block pool Id: " + bpId + " block id: " + blockId 2693 + " to block file " + savedFiles[1] + " and meta file " + savedFiles[0] 2694 + " on target volume " + targetVolume); 2695 } 2696 } 2697 } 2698 2699 @Override onFailLazyPersist(String bpId, long blockId)2700 public void onFailLazyPersist(String bpId, long blockId) { 2701 RamDiskReplica block = null; 2702 block = ramDiskReplicaTracker.getReplica(bpId, blockId); 2703 if (block != null) { 2704 LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); 2705 ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block); 2706 } 2707 } 2708 2709 @Override submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags)2710 public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, 2711 FileDescriptor fd, long offset, long nbytes, int flags) { 2712 FsVolumeImpl fsVolumeImpl = this.getVolume(block); 2713 asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset, 2714 nbytes, flags); 2715 } 2716 ramDiskConfigured()2717 private boolean ramDiskConfigured() { 2718 for (FsVolumeImpl v: getVolumes()){ 2719 if (v.isTransientStorage()) { 2720 return true; 2721 } 2722 } 2723 return false; 2724 } 2725 2726 // Add/Remove per DISK volume async lazy persist thread when RamDisk volume is 2727 // added or removed. 2728 // This should only be called when the FsDataSetImpl#volumes list is finalized. setupAsyncLazyPersistThreads()2729 private void setupAsyncLazyPersistThreads() { 2730 for (FsVolumeImpl v: getVolumes()){ 2731 setupAsyncLazyPersistThread(v); 2732 } 2733 } 2734 setupAsyncLazyPersistThread(final FsVolumeImpl v)2735 private void setupAsyncLazyPersistThread(final FsVolumeImpl v) { 2736 // Skip transient volumes 2737 if (v.isTransientStorage()) { 2738 return; 2739 } 2740 boolean ramDiskConfigured = ramDiskConfigured(); 2741 // Add thread for DISK volume if RamDisk is configured 2742 if (ramDiskConfigured && 2743 !asyncLazyPersistService.queryVolume(v.getCurrentDir())) { 2744 asyncLazyPersistService.addVolume(v.getCurrentDir()); 2745 } 2746 2747 // Remove thread for DISK volume if RamDisk is not configured 2748 if (!ramDiskConfigured && 2749 asyncLazyPersistService.queryVolume(v.getCurrentDir())) { 2750 asyncLazyPersistService.removeVolume(v.getCurrentDir()); 2751 } 2752 } 2753 removeOldReplica(ReplicaInfo replicaInfo, ReplicaInfo newReplicaInfo, File blockFile, File metaFile, long blockFileUsed, long metaFileUsed, final String bpid)2754 private void removeOldReplica(ReplicaInfo replicaInfo, 2755 ReplicaInfo newReplicaInfo, File blockFile, File metaFile, 2756 long blockFileUsed, long metaFileUsed, final String bpid) { 2757 // Before deleting the files from old storage we must notify the 2758 // NN that the files are on the new storage. Else a blockReport from 2759 // the transient storage might cause the NN to think the blocks are lost. 2760 // Replicas must be evicted from client short-circuit caches, because the 2761 // storage will no longer be same, and thus will require validating 2762 // checksum. This also stops a client from holding file descriptors, 2763 // which would prevent the OS from reclaiming the memory. 2764 ExtendedBlock extendedBlock = 2765 new ExtendedBlock(bpid, newReplicaInfo); 2766 datanode.getShortCircuitRegistry().processBlockInvalidation( 2767 ExtendedBlockId.fromExtendedBlock(extendedBlock)); 2768 datanode.notifyNamenodeReceivedBlock( 2769 extendedBlock, null, newReplicaInfo.getStorageUuid()); 2770 2771 // Remove the old replicas 2772 if (blockFile.delete() || !blockFile.exists()) { 2773 ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed); 2774 if (metaFile.delete() || !metaFile.exists()) { 2775 ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed); 2776 } 2777 } 2778 2779 // If deletion failed then the directory scanner will cleanup the blocks 2780 // eventually. 2781 } 2782 2783 class LazyWriter implements Runnable { 2784 private volatile boolean shouldRun = true; 2785 final int checkpointerInterval; 2786 final float lowWatermarkFreeSpacePercentage; 2787 final long lowWatermarkFreeSpaceBytes; 2788 2789 LazyWriter(Configuration conf)2790 public LazyWriter(Configuration conf) { 2791 this.checkpointerInterval = conf.getInt( 2792 DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 2793 DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC); 2794 this.lowWatermarkFreeSpacePercentage = conf.getFloat( 2795 DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT, 2796 DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT); 2797 this.lowWatermarkFreeSpaceBytes = conf.getLong( 2798 DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, 2799 DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT); 2800 } 2801 2802 /** 2803 * Checkpoint a pending replica to persistent storage now. 2804 * If we fail then move the replica to the end of the queue. 2805 * @return true if there is more work to be done, false otherwise. 2806 */ saveNextReplica()2807 private boolean saveNextReplica() { 2808 RamDiskReplica block = null; 2809 FsVolumeReference targetReference; 2810 FsVolumeImpl targetVolume; 2811 ReplicaInfo replicaInfo; 2812 boolean succeeded = false; 2813 2814 try { 2815 block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); 2816 if (block != null) { 2817 synchronized (FsDatasetImpl.this) { 2818 replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); 2819 2820 // If replicaInfo is null, the block was either deleted before 2821 // it could be checkpointed or it is already on persistent storage. 2822 // This can occur if a second replica on persistent storage was found 2823 // after the lazy write was scheduled. 2824 if (replicaInfo != null && 2825 replicaInfo.getVolume().isTransientStorage()) { 2826 // Pick a target volume to persist the block. 2827 targetReference = volumes.getNextVolume( 2828 StorageType.DEFAULT, replicaInfo.getNumBytes()); 2829 targetVolume = (FsVolumeImpl) targetReference.getVolume(); 2830 2831 ramDiskReplicaTracker.recordStartLazyPersist( 2832 block.getBlockPoolId(), block.getBlockId(), targetVolume); 2833 2834 if (LOG.isDebugEnabled()) { 2835 LOG.debug("LazyWriter: Start persisting RamDisk block:" 2836 + " block pool Id: " + block.getBlockPoolId() 2837 + " block id: " + block.getBlockId() 2838 + " on target volume " + targetVolume); 2839 } 2840 2841 asyncLazyPersistService.submitLazyPersistTask( 2842 block.getBlockPoolId(), block.getBlockId(), 2843 replicaInfo.getGenerationStamp(), block.getCreationTime(), 2844 replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), 2845 targetReference); 2846 } 2847 } 2848 } 2849 succeeded = true; 2850 } catch(IOException ioe) { 2851 LOG.warn("Exception saving replica " + block, ioe); 2852 } finally { 2853 if (!succeeded && block != null) { 2854 LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); 2855 onFailLazyPersist(block.getBlockPoolId(), block.getBlockId()); 2856 } 2857 } 2858 return succeeded; 2859 } 2860 transientFreeSpaceBelowThreshold()2861 private boolean transientFreeSpaceBelowThreshold() throws IOException { 2862 long free = 0; 2863 long capacity = 0; 2864 float percentFree = 0.0f; 2865 2866 // Don't worry about fragmentation for now. We don't expect more than one 2867 // transient volume per DN. 2868 for (FsVolumeImpl v : getVolumes()) { 2869 try (FsVolumeReference ref = v.obtainReference()) { 2870 if (v.isTransientStorage()) { 2871 capacity += v.getCapacity(); 2872 free += v.getAvailable(); 2873 } 2874 } catch (ClosedChannelException e) { 2875 // ignore. 2876 } 2877 } 2878 2879 if (capacity == 0) { 2880 return false; 2881 } 2882 2883 percentFree = (float) ((double)free * 100 / capacity); 2884 return (percentFree < lowWatermarkFreeSpacePercentage) || 2885 (free < lowWatermarkFreeSpaceBytes); 2886 } 2887 2888 /** 2889 * Attempt to evict one or more transient block replicas we have at least 2890 * spaceNeeded bytes free. 2891 */ evictBlocks()2892 private void evictBlocks() throws IOException { 2893 int iterations = 0; 2894 2895 while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION && 2896 transientFreeSpaceBelowThreshold()) { 2897 RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction(); 2898 2899 if (replicaState == null) { 2900 break; 2901 } 2902 2903 if (LOG.isDebugEnabled()) { 2904 LOG.debug("Evicting block " + replicaState); 2905 } 2906 2907 ReplicaInfo replicaInfo, newReplicaInfo; 2908 File blockFile, metaFile; 2909 long blockFileUsed, metaFileUsed; 2910 final String bpid = replicaState.getBlockPoolId(); 2911 2912 synchronized (FsDatasetImpl.this) { 2913 replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); 2914 Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); 2915 blockFile = replicaInfo.getBlockFile(); 2916 metaFile = replicaInfo.getMetaFile(); 2917 blockFileUsed = blockFile.length(); 2918 metaFileUsed = metaFile.length(); 2919 ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(), 2920 replicaState.getBlockId(), false); 2921 2922 // Move the replica from lazyPersist/ to finalized/ on target volume 2923 BlockPoolSlice bpSlice = 2924 replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid); 2925 File newBlockFile = bpSlice.activateSavedReplica( 2926 replicaInfo, replicaState.getSavedMetaFile(), 2927 replicaState.getSavedBlockFile()); 2928 2929 newReplicaInfo = 2930 new FinalizedReplica(replicaInfo.getBlockId(), 2931 replicaInfo.getBytesOnDisk(), 2932 replicaInfo.getGenerationStamp(), 2933 replicaState.getLazyPersistVolume(), 2934 newBlockFile.getParentFile()); 2935 2936 // Update the volumeMap entry. 2937 volumeMap.add(bpid, newReplicaInfo); 2938 2939 // Update metrics 2940 datanode.getMetrics().incrRamDiskBlocksEvicted(); 2941 datanode.getMetrics().addRamDiskBlocksEvictionWindowMs( 2942 Time.monotonicNow() - replicaState.getCreationTime()); 2943 if (replicaState.getNumReads() == 0) { 2944 datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead(); 2945 } 2946 } 2947 2948 removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile, 2949 blockFileUsed, metaFileUsed, bpid); 2950 } 2951 } 2952 2953 @Override run()2954 public void run() { 2955 int numSuccessiveFailures = 0; 2956 2957 while (fsRunning && shouldRun) { 2958 try { 2959 numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1); 2960 evictBlocks(); 2961 2962 // Sleep if we have no more work to do or if it looks like we are not 2963 // making any forward progress. This is to ensure that if all persist 2964 // operations are failing we don't keep retrying them in a tight loop. 2965 if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) { 2966 Thread.sleep(checkpointerInterval * 1000); 2967 numSuccessiveFailures = 0; 2968 } 2969 } catch (InterruptedException e) { 2970 LOG.info("LazyWriter was interrupted, exiting"); 2971 break; 2972 } catch (Exception e) { 2973 LOG.warn("Ignoring exception in LazyWriter:", e); 2974 } 2975 } 2976 } 2977 stop()2978 public void stop() { 2979 shouldRun = false; 2980 } 2981 } 2982 2983 @Override setPinning(ExtendedBlock block)2984 public void setPinning(ExtendedBlock block) throws IOException { 2985 if (!blockPinningEnabled) { 2986 return; 2987 } 2988 2989 File f = getBlockFile(block); 2990 Path p = new Path(f.getAbsolutePath()); 2991 2992 FsPermission oldPermission = localFS.getFileStatus( 2993 new Path(f.getAbsolutePath())).getPermission(); 2994 //sticky bit is used for pinning purpose 2995 FsPermission permission = new FsPermission(oldPermission.getUserAction(), 2996 oldPermission.getGroupAction(), oldPermission.getOtherAction(), true); 2997 localFS.setPermission(p, permission); 2998 } 2999 3000 @Override getPinning(ExtendedBlock block)3001 public boolean getPinning(ExtendedBlock block) throws IOException { 3002 if (!blockPinningEnabled) { 3003 return false; 3004 } 3005 File f = getBlockFile(block); 3006 3007 FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath())); 3008 return fss.getPermission().getStickyBit(); 3009 } 3010 3011 @Override isDeletingBlock(String bpid, long blockId)3012 public boolean isDeletingBlock(String bpid, long blockId) { 3013 synchronized(deletingBlock) { 3014 Set<Long> s = deletingBlock.get(bpid); 3015 return s != null ? s.contains(blockId) : false; 3016 } 3017 } 3018 removeDeletedBlocks(String bpid, Set<Long> blockIds)3019 public void removeDeletedBlocks(String bpid, Set<Long> blockIds) { 3020 synchronized (deletingBlock) { 3021 Set<Long> s = deletingBlock.get(bpid); 3022 if (s != null) { 3023 for (Long id : blockIds) { 3024 s.remove(id); 3025 } 3026 } 3027 } 3028 } 3029 addDeletingBlock(String bpid, Long blockId)3030 private void addDeletingBlock(String bpid, Long blockId) { 3031 synchronized(deletingBlock) { 3032 Set<Long> s = deletingBlock.get(bpid); 3033 if (s == null) { 3034 s = new HashSet<Long>(); 3035 deletingBlock.put(bpid, s); 3036 } 3037 s.add(blockId); 3038 } 3039 } 3040 } 3041 3042