1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
19 
20 import java.io.BufferedOutputStream;
21 import java.io.DataOutputStream;
22 import java.io.EOFException;
23 import java.io.File;
24 import java.io.FileDescriptor;
25 import java.io.FileInputStream;
26 import java.io.FileNotFoundException;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.io.RandomAccessFile;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.ClosedChannelException;
33 import java.nio.channels.FileChannel;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.HashMap;
37 import java.util.HashSet;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.*;
44 import java.util.concurrent.Executor;
45 
46 import javax.management.NotCompliantMBeanException;
47 import javax.management.ObjectName;
48 import javax.management.StandardMBean;
49 
50 import com.google.common.annotations.VisibleForTesting;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.classification.InterfaceAudience;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.fs.FileStatus;
56 import org.apache.hadoop.fs.FileSystem;
57 import org.apache.hadoop.fs.LocalFileSystem;
58 import org.apache.hadoop.fs.permission.FsPermission;
59 import org.apache.hadoop.fs.Path;
60 import org.apache.hadoop.fs.StorageType;
61 import org.apache.hadoop.hdfs.DFSConfigKeys;
62 import org.apache.hadoop.hdfs.ExtendedBlockId;
63 import org.apache.hadoop.hdfs.protocol.Block;
64 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
65 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
66 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
67 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
68 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
69 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
70 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
71 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
72 import org.apache.hadoop.hdfs.server.common.Storage;
73 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
74 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
75 import org.apache.hadoop.hdfs.server.datanode.DataNode;
76 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
77 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
78 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
79 import org.apache.hadoop.hdfs.server.datanode.Replica;
80 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
81 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
82 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
83 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
84 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
85 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
86 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
87 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
88 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
89 import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
90 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
91 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
92 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
93 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
94 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
95 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
96 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
97 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
98 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
99 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
100 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
101 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
102 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
103 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
104 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
105 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
106 import org.apache.hadoop.io.IOUtils;
107 import org.apache.hadoop.io.MultipleIOException;
108 import org.apache.hadoop.io.nativeio.NativeIO;
109 import org.apache.hadoop.metrics2.util.MBeans;
110 import org.apache.hadoop.util.Daemon;
111 import org.apache.hadoop.util.DataChecksum;
112 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
113 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
114 import org.apache.hadoop.util.ReflectionUtils;
115 import org.apache.hadoop.util.Time;
116 
117 import com.google.common.base.Preconditions;
118 import com.google.common.collect.Lists;
119 import com.google.common.collect.Sets;
120 
121 /**************************************************
122  * FSDataset manages a set of data blocks.  Each block
123  * has a unique name and an extent on disk.
124  *
125  ***************************************************/
126 @InterfaceAudience.Private
127 class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
128   static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
129   private final static boolean isNativeIOAvailable;
130   static {
131     isNativeIOAvailable = NativeIO.isAvailable();
132     if (Path.WINDOWS && !isNativeIOAvailable) {
133       LOG.warn("Data node cannot fully support concurrent reading"
134           + " and writing without native code extensions on Windows.");
135     }
136   }
137 
138   @Override // FsDatasetSpi
getVolumes()139   public List<FsVolumeImpl> getVolumes() {
140     return volumes.getVolumes();
141   }
142 
143   @Override
getStorage(final String storageUuid)144   public DatanodeStorage getStorage(final String storageUuid) {
145     return storageMap.get(storageUuid);
146   }
147 
148   @Override // FsDatasetSpi
getStorageReports(String bpid)149   public StorageReport[] getStorageReports(String bpid)
150       throws IOException {
151     List<StorageReport> reports;
152     synchronized (statsLock) {
153       List<FsVolumeImpl> curVolumes = getVolumes();
154       reports = new ArrayList<>(curVolumes.size());
155       for (FsVolumeImpl volume : curVolumes) {
156         try (FsVolumeReference ref = volume.obtainReference()) {
157           StorageReport sr = new StorageReport(volume.toDatanodeStorage(),
158               false,
159               volume.getCapacity(),
160               volume.getDfsUsed(),
161               volume.getAvailable(),
162               volume.getBlockPoolUsed(bpid));
163           reports.add(sr);
164         } catch (ClosedChannelException e) {
165           continue;
166         }
167       }
168     }
169 
170     return reports.toArray(new StorageReport[reports.size()]);
171   }
172 
173   @Override
getVolume(final ExtendedBlock b)174   public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) {
175     final ReplicaInfo r =  volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
176     return r != null? (FsVolumeImpl)r.getVolume(): null;
177   }
178 
179   @Override // FsDatasetSpi
getStoredBlock(String bpid, long blkid)180   public synchronized Block getStoredBlock(String bpid, long blkid)
181       throws IOException {
182     File blockfile = getFile(bpid, blkid, false);
183     if (blockfile == null) {
184       return null;
185     }
186     final File metafile = FsDatasetUtil.findMetaFile(blockfile);
187     final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
188     return new Block(blkid, blockfile.length(), gs);
189   }
190 
191 
192   /**
193    * This should be primarily used for testing.
194    * @return clone of replica store in datanode memory
195    */
fetchReplicaInfo(String bpid, long blockId)196   ReplicaInfo fetchReplicaInfo(String bpid, long blockId) {
197     ReplicaInfo r = volumeMap.get(bpid, blockId);
198     if(r == null)
199       return null;
200     switch(r.getState()) {
201     case FINALIZED:
202       return new FinalizedReplica((FinalizedReplica)r);
203     case RBW:
204       return new ReplicaBeingWritten((ReplicaBeingWritten)r);
205     case RWR:
206       return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r);
207     case RUR:
208       return new ReplicaUnderRecovery((ReplicaUnderRecovery)r);
209     case TEMPORARY:
210       return new ReplicaInPipeline((ReplicaInPipeline)r);
211     }
212     return null;
213   }
214 
215   @Override // FsDatasetSpi
getMetaDataInputStream(ExtendedBlock b)216   public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
217       throws IOException {
218     File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp());
219     if (meta == null || !meta.exists()) {
220       return null;
221     }
222     if (isNativeIOAvailable) {
223       return new LengthInputStream(
224           NativeIO.getShareDeleteFileInputStream(meta),
225           meta.length());
226     }
227     return new LengthInputStream(new FileInputStream(meta), meta.length());
228   }
229 
230   final DataNode datanode;
231   final DataStorage dataStorage;
232   final FsVolumeList volumes;
233   final Map<String, DatanodeStorage> storageMap;
234   final FsDatasetAsyncDiskService asyncDiskService;
235   final Daemon lazyWriter;
236   final FsDatasetCache cacheManager;
237   private final Configuration conf;
238   private final int validVolsRequired;
239   private volatile boolean fsRunning;
240 
241   final ReplicaMap volumeMap;
242   final Map<String, Set<Long>> deletingBlock;
243   final RamDiskReplicaTracker ramDiskReplicaTracker;
244   final RamDiskAsyncLazyPersistService asyncLazyPersistService;
245 
246   private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
247 
248 
249   // Used for synchronizing access to usage stats
250   private final Object statsLock = new Object();
251 
252   final LocalFileSystem localFS;
253 
254   private boolean blockPinningEnabled;
255 
256   /**
257    * An FSDataset has a directory where it loads its data files.
258    */
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf )259   FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
260       ) throws IOException {
261     this.fsRunning = true;
262     this.datanode = datanode;
263     this.dataStorage = storage;
264     this.conf = conf;
265     // The number of volumes required for operation is the total number
266     // of volumes minus the number of failed volumes we can tolerate.
267     final int volFailuresTolerated =
268       conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
269                   DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
270 
271     String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
272     Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
273     List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
274         dataLocations, storage);
275 
276     int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
277     int volsFailed = volumeFailureInfos.size();
278     this.validVolsRequired = volsConfigured - volFailuresTolerated;
279 
280     if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
281       throw new DiskErrorException("Invalid volume failure "
282           + " config value: " + volFailuresTolerated);
283     }
284     if (volsFailed > volFailuresTolerated) {
285       throw new DiskErrorException("Too many failed volumes - "
286           + "current valid volumes: " + storage.getNumStorageDirs()
287           + ", volumes configured: " + volsConfigured
288           + ", volumes failed: " + volsFailed
289           + ", volume failures tolerated: " + volFailuresTolerated);
290     }
291 
292     storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
293     volumeMap = new ReplicaMap(this);
294     ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
295 
296     @SuppressWarnings("unchecked")
297     final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
298         ReflectionUtils.newInstance(conf.getClass(
299             DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
300             RoundRobinVolumeChoosingPolicy.class,
301             VolumeChoosingPolicy.class), conf);
302     volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
303         blockChooserImpl);
304     asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
305     asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
306     deletingBlock = new HashMap<String, Set<Long>>();
307 
308     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
309       addVolume(dataLocations, storage.getStorageDir(idx));
310     }
311     setupAsyncLazyPersistThreads();
312 
313     cacheManager = new FsDatasetCache(this);
314 
315     // Start the lazy writer once we have built the replica maps.
316     lazyWriter = new Daemon(new LazyWriter(conf));
317     lazyWriter.start();
318     registerMBean(datanode.getDatanodeUuid());
319     localFS = FileSystem.getLocal(conf);
320     blockPinningEnabled = conf.getBoolean(
321       DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
322       DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
323   }
324 
325   /**
326    * Gets initial volume failure information for all volumes that failed
327    * immediately at startup.  The method works by determining the set difference
328    * between all configured storage locations and the actual storage locations in
329    * use after attempting to put all of them into service.
330    *
331    * @return each storage location that has failed
332    */
getInitialVolumeFailureInfos( Collection<StorageLocation> dataLocations, DataStorage storage)333   private static List<VolumeFailureInfo> getInitialVolumeFailureInfos(
334       Collection<StorageLocation> dataLocations, DataStorage storage) {
335     Set<String> failedLocationSet = Sets.newHashSetWithExpectedSize(
336         dataLocations.size());
337     for (StorageLocation sl: dataLocations) {
338       failedLocationSet.add(sl.getFile().getAbsolutePath());
339     }
340     for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
341          it.hasNext(); ) {
342       Storage.StorageDirectory sd = it.next();
343       failedLocationSet.remove(sd.getRoot().getAbsolutePath());
344     }
345     List<VolumeFailureInfo> volumeFailureInfos = Lists.newArrayListWithCapacity(
346         failedLocationSet.size());
347     long failureDate = Time.now();
348     for (String failedStorageLocation: failedLocationSet) {
349       volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation,
350           failureDate));
351     }
352     return volumeFailureInfos;
353   }
354 
addVolume(Collection<StorageLocation> dataLocations, Storage.StorageDirectory sd)355   private void addVolume(Collection<StorageLocation> dataLocations,
356       Storage.StorageDirectory sd) throws IOException {
357     final File dir = sd.getCurrentDir();
358     final StorageType storageType =
359         getStorageTypeFromLocations(dataLocations, sd.getRoot());
360 
361     // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
362     // nothing needed to be rolled back to make various data structures, e.g.,
363     // storageMap and asyncDiskService, consistent.
364     FsVolumeImpl fsVolume = new FsVolumeImpl(
365         this, sd.getStorageUuid(), dir, this.conf, storageType);
366     FsVolumeReference ref = fsVolume.obtainReference();
367     ReplicaMap tempVolumeMap = new ReplicaMap(this);
368     fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
369 
370     synchronized (this) {
371       volumeMap.addAll(tempVolumeMap);
372       storageMap.put(sd.getStorageUuid(),
373           new DatanodeStorage(sd.getStorageUuid(),
374               DatanodeStorage.State.NORMAL,
375               storageType));
376       asyncDiskService.addVolume(sd.getCurrentDir());
377       volumes.addVolume(ref);
378     }
379 
380     LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
381   }
382 
383   @VisibleForTesting
createFsVolume(String storageUuid, File currentDir, StorageType storageType)384   public FsVolumeImpl createFsVolume(String storageUuid, File currentDir,
385       StorageType storageType) throws IOException {
386     return new FsVolumeImpl(this, storageUuid, currentDir, conf, storageType);
387   }
388 
389   @Override
addVolume(final StorageLocation location, final List<NamespaceInfo> nsInfos)390   public void addVolume(final StorageLocation location,
391       final List<NamespaceInfo> nsInfos)
392       throws IOException {
393     final File dir = location.getFile();
394 
395     // Prepare volume in DataStorage
396     final DataStorage.VolumeBuilder builder;
397     try {
398       builder = dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
399     } catch (IOException e) {
400       volumes.addVolumeFailureInfo(new VolumeFailureInfo(
401           location.getFile().getAbsolutePath(), Time.now()));
402       throw e;
403     }
404 
405     final Storage.StorageDirectory sd = builder.getStorageDirectory();
406 
407     StorageType storageType = location.getStorageType();
408     final FsVolumeImpl fsVolume =
409         createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
410     final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
411     ArrayList<IOException> exceptions = Lists.newArrayList();
412 
413     for (final NamespaceInfo nsInfo : nsInfos) {
414       String bpid = nsInfo.getBlockPoolID();
415       try {
416         fsVolume.addBlockPool(bpid, this.conf);
417         fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
418       } catch (IOException e) {
419         LOG.warn("Caught exception when adding " + fsVolume +
420             ". Will throw later.", e);
421         exceptions.add(e);
422       }
423     }
424     if (!exceptions.isEmpty()) {
425       try {
426         sd.unlock();
427       } catch (IOException e) {
428         exceptions.add(e);
429       }
430       throw MultipleIOException.createIOException(exceptions);
431     }
432 
433     final FsVolumeReference ref = fsVolume.obtainReference();
434     setupAsyncLazyPersistThread(fsVolume);
435 
436     builder.build();
437     synchronized (this) {
438       volumeMap.addAll(tempVolumeMap);
439       storageMap.put(sd.getStorageUuid(),
440           new DatanodeStorage(sd.getStorageUuid(),
441               DatanodeStorage.State.NORMAL,
442               storageType));
443       asyncDiskService.addVolume(sd.getCurrentDir());
444       volumes.addVolume(ref);
445     }
446     LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
447   }
448 
449   /**
450    * Removes a set of volumes from FsDataset.
451    * @param volumesToRemove a set of absolute root path of each volume.
452    * @param clearFailure set true to clear failure information.
453    */
454   @Override
removeVolumes(Set<File> volumesToRemove, boolean clearFailure)455   public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) {
456     // Make sure that all volumes are absolute path.
457     for (File vol : volumesToRemove) {
458       Preconditions.checkArgument(vol.isAbsolute(),
459           String.format("%s is not absolute path.", vol.getPath()));
460     }
461 
462     Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
463     List<String> storageToRemove = new ArrayList<>();
464     synchronized (this) {
465       for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
466         Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
467         final File absRoot = sd.getRoot().getAbsoluteFile();
468         if (volumesToRemove.contains(absRoot)) {
469           LOG.info("Removing " + absRoot + " from FsDataset.");
470 
471           // Disable the volume from the service.
472           asyncDiskService.removeVolume(sd.getCurrentDir());
473           volumes.removeVolume(absRoot, clearFailure);
474 
475           // Removed all replica information for the blocks on the volume.
476           // Unlike updating the volumeMap in addVolume(), this operation does
477           // not scan disks.
478           for (String bpid : volumeMap.getBlockPoolList()) {
479             List<ReplicaInfo> blocks = new ArrayList<>();
480             for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
481                  it.hasNext(); ) {
482               ReplicaInfo block = it.next();
483               final File absBasePath =
484                   new File(block.getVolume().getBasePath()).getAbsoluteFile();
485               if (absBasePath.equals(absRoot)) {
486                 blocks.add(block);
487                 it.remove();
488               }
489             }
490             blkToInvalidate.put(bpid, blocks);
491           }
492 
493           storageToRemove.add(sd.getStorageUuid());
494         }
495       }
496       setupAsyncLazyPersistThreads();
497     }
498 
499     // Call this outside the lock.
500     for (Map.Entry<String, List<ReplicaInfo>> entry :
501         blkToInvalidate.entrySet()) {
502       String bpid = entry.getKey();
503       List<ReplicaInfo> blocks = entry.getValue();
504       for (ReplicaInfo block : blocks) {
505         invalidate(bpid, block);
506       }
507     }
508 
509     synchronized (this) {
510       for(String storageUuid : storageToRemove) {
511         storageMap.remove(storageUuid);
512       }
513     }
514   }
515 
getStorageTypeFromLocations( Collection<StorageLocation> dataLocations, File dir)516   private StorageType getStorageTypeFromLocations(
517       Collection<StorageLocation> dataLocations, File dir) {
518     for (StorageLocation dataLocation : dataLocations) {
519       if (dataLocation.getFile().equals(dir)) {
520         return dataLocation.getStorageType();
521       }
522     }
523     return StorageType.DEFAULT;
524   }
525 
526   /**
527    * Return the total space used by dfs datanode
528    */
529   @Override // FSDatasetMBean
getDfsUsed()530   public long getDfsUsed() throws IOException {
531     synchronized(statsLock) {
532       return volumes.getDfsUsed();
533     }
534   }
535 
536   /**
537    * Return the total space used by dfs datanode
538    */
539   @Override // FSDatasetMBean
getBlockPoolUsed(String bpid)540   public long getBlockPoolUsed(String bpid) throws IOException {
541     synchronized(statsLock) {
542       return volumes.getBlockPoolUsed(bpid);
543     }
544   }
545 
546   /**
547    * Return true - if there are still valid volumes on the DataNode.
548    */
549   @Override // FsDatasetSpi
hasEnoughResource()550   public boolean hasEnoughResource() {
551     return getVolumes().size() >= validVolsRequired;
552   }
553 
554   /**
555    * Return total capacity, used and unused
556    */
557   @Override // FSDatasetMBean
getCapacity()558   public long getCapacity() {
559     synchronized(statsLock) {
560       return volumes.getCapacity();
561     }
562   }
563 
564   /**
565    * Return how many bytes can still be stored in the FSDataset
566    */
567   @Override // FSDatasetMBean
getRemaining()568   public long getRemaining() throws IOException {
569     synchronized(statsLock) {
570       return volumes.getRemaining();
571     }
572   }
573 
574   /**
575    * Return the number of failed volumes in the FSDataset.
576    */
577   @Override // FSDatasetMBean
getNumFailedVolumes()578   public int getNumFailedVolumes() {
579     return volumes.getVolumeFailureInfos().length;
580   }
581 
582   @Override // FSDatasetMBean
getFailedStorageLocations()583   public String[] getFailedStorageLocations() {
584     VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos();
585     List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
586         infos.length);
587     for (VolumeFailureInfo info: infos) {
588       failedStorageLocations.add(info.getFailedStorageLocation());
589     }
590     return failedStorageLocations.toArray(
591         new String[failedStorageLocations.size()]);
592   }
593 
594   @Override // FSDatasetMBean
getLastVolumeFailureDate()595   public long getLastVolumeFailureDate() {
596     long lastVolumeFailureDate = 0;
597     for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) {
598       long failureDate = info.getFailureDate();
599       if (failureDate > lastVolumeFailureDate) {
600         lastVolumeFailureDate = failureDate;
601       }
602     }
603     return lastVolumeFailureDate;
604   }
605 
606   @Override // FSDatasetMBean
getEstimatedCapacityLostTotal()607   public long getEstimatedCapacityLostTotal() {
608     long estimatedCapacityLostTotal = 0;
609     for (VolumeFailureInfo info: volumes.getVolumeFailureInfos()) {
610       estimatedCapacityLostTotal += info.getEstimatedCapacityLost();
611     }
612     return estimatedCapacityLostTotal;
613   }
614 
615   @Override // FsDatasetSpi
getVolumeFailureSummary()616   public VolumeFailureSummary getVolumeFailureSummary() {
617     VolumeFailureInfo[] infos = volumes.getVolumeFailureInfos();
618     if (infos.length == 0) {
619       return null;
620     }
621     List<String> failedStorageLocations = Lists.newArrayListWithCapacity(
622         infos.length);
623     long lastVolumeFailureDate = 0;
624     long estimatedCapacityLostTotal = 0;
625     for (VolumeFailureInfo info: infos) {
626       failedStorageLocations.add(info.getFailedStorageLocation());
627       long failureDate = info.getFailureDate();
628       if (failureDate > lastVolumeFailureDate) {
629         lastVolumeFailureDate = failureDate;
630       }
631       estimatedCapacityLostTotal += info.getEstimatedCapacityLost();
632     }
633     return new VolumeFailureSummary(
634         failedStorageLocations.toArray(new String[failedStorageLocations.size()]),
635         lastVolumeFailureDate, estimatedCapacityLostTotal);
636   }
637 
638   @Override // FSDatasetMBean
getCacheUsed()639   public long getCacheUsed() {
640     return cacheManager.getCacheUsed();
641   }
642 
643   @Override // FSDatasetMBean
getCacheCapacity()644   public long getCacheCapacity() {
645     return cacheManager.getCacheCapacity();
646   }
647 
648   @Override // FSDatasetMBean
getNumBlocksFailedToCache()649   public long getNumBlocksFailedToCache() {
650     return cacheManager.getNumBlocksFailedToCache();
651   }
652 
653   @Override // FSDatasetMBean
getNumBlocksFailedToUncache()654   public long getNumBlocksFailedToUncache() {
655     return cacheManager.getNumBlocksFailedToUncache();
656   }
657 
658   @Override // FSDatasetMBean
getNumBlocksCached()659   public long getNumBlocksCached() {
660     return cacheManager.getNumBlocksCached();
661   }
662 
663   /**
664    * Find the block's on-disk length
665    */
666   @Override // FsDatasetSpi
getLength(ExtendedBlock b)667   public long getLength(ExtendedBlock b) throws IOException {
668     return getBlockFile(b).length();
669   }
670 
671   /**
672    * Get File name for a given block.
673    */
getBlockFile(ExtendedBlock b)674   private File getBlockFile(ExtendedBlock b) throws IOException {
675     return getBlockFile(b.getBlockPoolId(), b.getBlockId());
676   }
677 
678   /**
679    * Get File name for a given block.
680    */
getBlockFile(String bpid, long blockId)681   File getBlockFile(String bpid, long blockId) throws IOException {
682     File f = validateBlockFile(bpid, blockId);
683     if(f == null) {
684       throw new IOException("BlockId " + blockId + " is not valid.");
685     }
686     return f;
687   }
688 
689   /**
690    * Return the File associated with a block, without first
691    * checking that it exists. This should be used when the
692    * next operation is going to open the file for read anyway,
693    * and thus the exists check is redundant.
694    *
695    * @param touch if true then update the last access timestamp of the
696    *              block. Currently used for blocks on transient storage.
697    */
getBlockFileNoExistsCheck(ExtendedBlock b, boolean touch)698   private File getBlockFileNoExistsCheck(ExtendedBlock b,
699                                          boolean touch)
700       throws IOException {
701     final File f;
702     synchronized(this) {
703       f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
704     }
705     if (f == null) {
706       throw new IOException("Block " + b + " is not valid");
707     }
708     return f;
709   }
710 
711   @Override // FsDatasetSpi
getBlockInputStream(ExtendedBlock b, long seekOffset)712   public InputStream getBlockInputStream(ExtendedBlock b,
713       long seekOffset) throws IOException {
714     File blockFile = getBlockFileNoExistsCheck(b, true);
715     if (isNativeIOAvailable) {
716       return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
717     } else {
718       try {
719         return openAndSeek(blockFile, seekOffset);
720       } catch (FileNotFoundException fnfe) {
721         throw new IOException("Block " + b + " is not valid. " +
722             "Expected block file at " + blockFile + " does not exist.");
723       }
724     }
725   }
726 
727   /**
728    * Get the meta info of a block stored in volumeMap. To find a block,
729    * block pool Id, block Id and generation stamp must match.
730    * @param b extended block
731    * @return the meta replica information
732    * @throws ReplicaNotFoundException if no entry is in the map or
733    *                        there is a generation stamp mismatch
734    */
getReplicaInfo(ExtendedBlock b)735   ReplicaInfo getReplicaInfo(ExtendedBlock b)
736       throws ReplicaNotFoundException {
737     ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
738     if (info == null) {
739       throw new ReplicaNotFoundException(
740           ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
741     }
742     return info;
743   }
744 
745   /**
746    * Get the meta info of a block stored in volumeMap. Block is looked up
747    * without matching the generation stamp.
748    * @param bpid block pool Id
749    * @param blkid block Id
750    * @return the meta replica information; null if block was not found
751    * @throws ReplicaNotFoundException if no entry is in the map or
752    *                        there is a generation stamp mismatch
753    */
getReplicaInfo(String bpid, long blkid)754   private ReplicaInfo getReplicaInfo(String bpid, long blkid)
755       throws ReplicaNotFoundException {
756     ReplicaInfo info = volumeMap.get(bpid, blkid);
757     if (info == null) {
758       throw new ReplicaNotFoundException(
759           ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
760     }
761     return info;
762   }
763 
764   /**
765    * Returns handles to the block file and its metadata file
766    */
767   @Override // FsDatasetSpi
getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset)768   public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
769       long blkOffset, long metaOffset) throws IOException {
770     ReplicaInfo info = getReplicaInfo(b);
771     FsVolumeReference ref = info.getVolume().obtainReference();
772     try {
773       InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
774       try {
775         InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset);
776         return new ReplicaInputStreams(blockInStream, metaInStream, ref);
777       } catch (IOException e) {
778         IOUtils.cleanup(null, blockInStream);
779         throw e;
780       }
781     } catch (IOException e) {
782       IOUtils.cleanup(null, ref);
783       throw e;
784     }
785   }
786 
openAndSeek(File file, long offset)787   private static FileInputStream openAndSeek(File file, long offset)
788       throws IOException {
789     RandomAccessFile raf = null;
790     try {
791       raf = new RandomAccessFile(file, "r");
792       if (offset > 0) {
793         raf.seek(offset);
794       }
795       return new FileInputStream(raf.getFD());
796     } catch(IOException ioe) {
797       IOUtils.cleanup(null, raf);
798       throw ioe;
799     }
800   }
801 
moveBlockFiles(Block b, File srcfile, File destdir)802   static File moveBlockFiles(Block b, File srcfile, File destdir)
803       throws IOException {
804     final File dstfile = new File(destdir, b.getBlockName());
805     final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
806     final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
807     try {
808       NativeIO.renameTo(srcmeta, dstmeta);
809     } catch (IOException e) {
810       throw new IOException("Failed to move meta file for " + b
811           + " from " + srcmeta + " to " + dstmeta, e);
812     }
813     try {
814       NativeIO.renameTo(srcfile, dstfile);
815     } catch (IOException e) {
816       throw new IOException("Failed to move block file for " + b
817           + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
818     }
819     if (LOG.isDebugEnabled()) {
820       LOG.debug("addFinalizedBlock: Moved " + srcmeta + " to " + dstmeta
821           + " and " + srcfile + " to " + dstfile);
822     }
823     return dstfile;
824   }
825 
826   /**
827    * Copy the block and meta files for the given block to the given destination.
828    * @return the new meta and block files.
829    * @throws IOException
830    */
copyBlockFiles(long blockId, long genStamp, File srcMeta, File srcFile, File destRoot, boolean calculateChecksum)831   static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
832       File srcFile, File destRoot, boolean calculateChecksum)
833       throws IOException {
834     final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
835     final File dstFile = new File(destDir, srcFile.getName());
836     final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
837     return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum);
838   }
839 
copyBlockFiles(File srcMeta, File srcFile, File dstMeta, File dstFile, boolean calculateChecksum)840   static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta,
841                                File dstFile, boolean calculateChecksum)
842       throws IOException {
843     if (calculateChecksum) {
844       computeChecksum(srcMeta, dstMeta, srcFile);
845     } else {
846       try {
847         Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
848       } catch (IOException e) {
849         throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
850       }
851     }
852 
853     try {
854       Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
855     } catch (IOException e) {
856       throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
857     }
858     if (LOG.isDebugEnabled()) {
859       if (calculateChecksum) {
860         LOG.debug("Copied " + srcMeta + " to " + dstMeta
861             + " and calculated checksum");
862       } else {
863         LOG.debug("Copied " + srcFile + " to " + dstFile);
864       }
865     }
866     return new File[] {dstMeta, dstFile};
867   }
868 
869   /**
870    * Move block files from one storage to another storage.
871    * @return Returns the Old replicaInfo
872    * @throws IOException
873    */
874   @Override
moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType)875   public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
876       StorageType targetStorageType) throws IOException {
877     ReplicaInfo replicaInfo = getReplicaInfo(block);
878     if (replicaInfo.getState() != ReplicaState.FINALIZED) {
879       throw new ReplicaNotFoundException(
880           ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
881     }
882     if (replicaInfo.getNumBytes() != block.getNumBytes()) {
883       throw new IOException("Corrupted replica " + replicaInfo
884           + " with a length of " + replicaInfo.getNumBytes()
885           + " expected length is " + block.getNumBytes());
886     }
887     if (replicaInfo.getVolume().getStorageType() == targetStorageType) {
888       throw new ReplicaAlreadyExistsException("Replica " + replicaInfo
889           + " already exists on storage " + targetStorageType);
890     }
891 
892     if (replicaInfo.isOnTransientStorage()) {
893       // Block movement from RAM_DISK will be done by LazyPersist mechanism
894       throw new IOException("Replica " + replicaInfo
895           + " cannot be moved from storageType : "
896           + replicaInfo.getVolume().getStorageType());
897     }
898 
899     try (FsVolumeReference volumeRef = volumes.getNextVolume(
900         targetStorageType, block.getNumBytes())) {
901       File oldBlockFile = replicaInfo.getBlockFile();
902       File oldMetaFile = replicaInfo.getMetaFile();
903       FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
904       // Copy files to temp dir first
905       File[] blockFiles = copyBlockFiles(block.getBlockId(),
906           block.getGenerationStamp(), oldMetaFile, oldBlockFile,
907           targetVolume.getTmpDir(block.getBlockPoolId()),
908           replicaInfo.isOnTransientStorage());
909 
910       ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
911           replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
912           targetVolume, blockFiles[0].getParentFile(), 0);
913       newReplicaInfo.setNumBytes(blockFiles[1].length());
914       // Finalize the copied files
915       newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
916 
917       removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
918           oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
919     }
920 
921     // Replace the old block if any to reschedule the scanning.
922     return replicaInfo;
923   }
924 
925   /**
926    * Compute and store the checksum for a block file that does not already have
927    * its checksum computed.
928    *
929    * @param srcMeta source meta file, containing only the checksum header, not a
930    *     calculated checksum
931    * @param dstMeta destination meta file, into which this method will write a
932    *     full computed checksum
933    * @param blockFile block file for which the checksum will be computed
934    * @throws IOException
935    */
computeChecksum(File srcMeta, File dstMeta, File blockFile)936   private static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
937       throws IOException {
938     final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
939     final byte[] data = new byte[1 << 16];
940     final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
941 
942     DataOutputStream metaOut = null;
943     try {
944       File parentFile = dstMeta.getParentFile();
945       if (parentFile != null) {
946         if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
947           throw new IOException("Destination '" + parentFile
948               + "' directory cannot be created");
949         }
950       }
951       metaOut = new DataOutputStream(new BufferedOutputStream(
952           new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE));
953       BlockMetadataHeader.writeHeader(metaOut, checksum);
954 
955       int offset = 0;
956       try (InputStream dataIn = isNativeIOAvailable ?
957           NativeIO.getShareDeleteFileInputStream(blockFile) :
958           new FileInputStream(blockFile)) {
959 
960         for (int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
961           if (n > 0) {
962             n += offset;
963             offset = n % checksum.getBytesPerChecksum();
964             final int length = n - offset;
965 
966             if (length > 0) {
967               checksum.calculateChunkedSums(data, 0, length, crcs, 0);
968               metaOut.write(crcs, 0, checksum.getChecksumSize(length));
969 
970               System.arraycopy(data, length, data, 0, offset);
971             }
972           }
973         }
974       }
975 
976       // calculate and write the last crc
977       checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
978       metaOut.write(crcs, 0, 4);
979     } finally {
980       IOUtils.cleanup(LOG, metaOut);
981     }
982   }
983 
truncateBlock(File blockFile, File metaFile, long oldlen, long newlen)984   static private void truncateBlock(File blockFile, File metaFile,
985       long oldlen, long newlen) throws IOException {
986     LOG.info("truncateBlock: blockFile=" + blockFile
987         + ", metaFile=" + metaFile
988         + ", oldlen=" + oldlen
989         + ", newlen=" + newlen);
990 
991     if (newlen == oldlen) {
992       return;
993     }
994     if (newlen > oldlen) {
995       throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
996           + ") to newlen (=" + newlen + ")");
997     }
998 
999     DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
1000     int checksumsize = dcs.getChecksumSize();
1001     int bpc = dcs.getBytesPerChecksum();
1002     long n = (newlen - 1)/bpc + 1;
1003     long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
1004     long lastchunkoffset = (n - 1)*bpc;
1005     int lastchunksize = (int)(newlen - lastchunkoffset);
1006     byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
1007 
1008     RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
1009     try {
1010       //truncate blockFile
1011       blockRAF.setLength(newlen);
1012 
1013       //read last chunk
1014       blockRAF.seek(lastchunkoffset);
1015       blockRAF.readFully(b, 0, lastchunksize);
1016     } finally {
1017       blockRAF.close();
1018     }
1019 
1020     //compute checksum
1021     dcs.update(b, 0, lastchunksize);
1022     dcs.writeValue(b, 0, false);
1023 
1024     //update metaFile
1025     RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
1026     try {
1027       metaRAF.setLength(newmetalen);
1028       metaRAF.seek(newmetalen - checksumsize);
1029       metaRAF.write(b, 0, checksumsize);
1030     } finally {
1031       metaRAF.close();
1032     }
1033   }
1034 
1035 
1036   @Override  // FsDatasetSpi
append(ExtendedBlock b, long newGS, long expectedBlockLen)1037   public synchronized ReplicaHandler append(ExtendedBlock b,
1038       long newGS, long expectedBlockLen) throws IOException {
1039     // If the block was successfully finalized because all packets
1040     // were successfully processed at the Datanode but the ack for
1041     // some of the packets were not received by the client. The client
1042     // re-opens the connection and retries sending those packets.
1043     // The other reason is that an "append" is occurring to this block.
1044 
1045     // check the validity of the parameter
1046     if (newGS < b.getGenerationStamp()) {
1047       throw new IOException("The new generation stamp " + newGS +
1048           " should be greater than the replica " + b + "'s generation stamp");
1049     }
1050     ReplicaInfo replicaInfo = getReplicaInfo(b);
1051     LOG.info("Appending to " + replicaInfo);
1052     if (replicaInfo.getState() != ReplicaState.FINALIZED) {
1053       throw new ReplicaNotFoundException(
1054           ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
1055     }
1056     if (replicaInfo.getNumBytes() != expectedBlockLen) {
1057       throw new IOException("Corrupted replica " + replicaInfo +
1058           " with a length of " + replicaInfo.getNumBytes() +
1059           " expected length is " + expectedBlockLen);
1060     }
1061 
1062     FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
1063     ReplicaBeingWritten replica = null;
1064     try {
1065       replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
1066           b.getNumBytes());
1067     } catch (IOException e) {
1068       IOUtils.cleanup(null, ref);
1069       throw e;
1070     }
1071     return new ReplicaHandler(replica, ref);
1072   }
1073 
1074   /** Append to a finalized replica
1075    * Change a finalized replica to be a RBW replica and
1076    * bump its generation stamp to be the newGS
1077    *
1078    * @param bpid block pool Id
1079    * @param replicaInfo a finalized replica
1080    * @param newGS new generation stamp
1081    * @param estimateBlockLen estimate generation stamp
1082    * @return a RBW replica
1083    * @throws IOException if moving the replica from finalized directory
1084    *         to rbw directory fails
1085    */
append(String bpid, FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)1086   private synchronized ReplicaBeingWritten append(String bpid,
1087       FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
1088       throws IOException {
1089     // If the block is cached, start uncaching it.
1090     cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
1091     // unlink the finalized replica
1092     replicaInfo.unlinkBlock(1);
1093 
1094     // construct a RBW replica with the new GS
1095     File blkfile = replicaInfo.getBlockFile();
1096     FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
1097     if (v.getAvailable() < estimateBlockLen - replicaInfo.getNumBytes()) {
1098       throw new DiskOutOfSpaceException("Insufficient space for appending to "
1099           + replicaInfo);
1100     }
1101     File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName());
1102     File oldmeta = replicaInfo.getMetaFile();
1103     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
1104         replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
1105         v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen);
1106     File newmeta = newReplicaInfo.getMetaFile();
1107 
1108     // rename meta file to rbw directory
1109     if (LOG.isDebugEnabled()) {
1110       LOG.debug("Renaming " + oldmeta + " to " + newmeta);
1111     }
1112     try {
1113       NativeIO.renameTo(oldmeta, newmeta);
1114     } catch (IOException e) {
1115       throw new IOException("Block " + replicaInfo + " reopen failed. " +
1116                             " Unable to move meta file  " + oldmeta +
1117                             " to rbw dir " + newmeta, e);
1118     }
1119 
1120     // rename block file to rbw directory
1121     if (LOG.isDebugEnabled()) {
1122       LOG.debug("Renaming " + blkfile + " to " + newBlkFile
1123           + ", file length=" + blkfile.length());
1124     }
1125     try {
1126       NativeIO.renameTo(blkfile, newBlkFile);
1127     } catch (IOException e) {
1128       try {
1129         NativeIO.renameTo(newmeta, oldmeta);
1130       } catch (IOException ex) {
1131         LOG.warn("Cannot move meta file " + newmeta +
1132             "back to the finalized directory " + oldmeta, ex);
1133       }
1134       throw new IOException("Block " + replicaInfo + " reopen failed. " +
1135                               " Unable to move block file " + blkfile +
1136                               " to rbw dir " + newBlkFile, e);
1137     }
1138 
1139     // Replace finalized replica by a RBW replica in replicas map
1140     volumeMap.add(bpid, newReplicaInfo);
1141     v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes());
1142     return newReplicaInfo;
1143   }
1144 
recoverCheck(ExtendedBlock b, long newGS, long expectedBlockLen)1145   private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS,
1146       long expectedBlockLen) throws IOException {
1147     ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
1148 
1149     // check state
1150     if (replicaInfo.getState() != ReplicaState.FINALIZED &&
1151         replicaInfo.getState() != ReplicaState.RBW) {
1152       throw new ReplicaNotFoundException(
1153           ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
1154     }
1155 
1156     // check generation stamp
1157     long replicaGenerationStamp = replicaInfo.getGenerationStamp();
1158     if (replicaGenerationStamp < b.getGenerationStamp() ||
1159         replicaGenerationStamp > newGS) {
1160       throw new ReplicaNotFoundException(
1161           ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
1162           + ". Expected GS range is [" + b.getGenerationStamp() + ", " +
1163           newGS + "].");
1164     }
1165 
1166     // stop the previous writer before check a replica's length
1167     long replicaLen = replicaInfo.getNumBytes();
1168     if (replicaInfo.getState() == ReplicaState.RBW) {
1169       ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
1170       // kill the previous writer
1171       rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
1172       rbw.setWriter(Thread.currentThread());
1173       // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
1174       if (replicaLen != rbw.getBytesOnDisk()
1175           || replicaLen != rbw.getBytesAcked()) {
1176         throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo +
1177             "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" +
1178             rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
1179             ") are not the same.");
1180       }
1181     }
1182 
1183     // check block length
1184     if (replicaLen != expectedBlockLen) {
1185       throw new IOException("Corrupted replica " + replicaInfo +
1186           " with a length of " + replicaLen +
1187           " expected length is " + expectedBlockLen);
1188     }
1189 
1190     return replicaInfo;
1191   }
1192 
1193   @Override  // FsDatasetSpi
recoverAppend( ExtendedBlock b, long newGS, long expectedBlockLen)1194   public synchronized ReplicaHandler recoverAppend(
1195       ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
1196     LOG.info("Recover failed append to " + b);
1197 
1198     ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
1199 
1200     FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
1201     ReplicaBeingWritten replica;
1202     try {
1203       // change the replica's state/gs etc.
1204       if (replicaInfo.getState() == ReplicaState.FINALIZED) {
1205         replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
1206                          newGS, b.getNumBytes());
1207       } else { //RBW
1208         bumpReplicaGS(replicaInfo, newGS);
1209         replica = (ReplicaBeingWritten) replicaInfo;
1210       }
1211     } catch (IOException e) {
1212       IOUtils.cleanup(null, ref);
1213       throw e;
1214     }
1215     return new ReplicaHandler(replica, ref);
1216   }
1217 
1218   @Override // FsDatasetSpi
recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)1219   public synchronized String recoverClose(ExtendedBlock b, long newGS,
1220       long expectedBlockLen) throws IOException {
1221     LOG.info("Recover failed close " + b);
1222     // check replica's state
1223     ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
1224     // bump the replica's GS
1225     bumpReplicaGS(replicaInfo, newGS);
1226     // finalize the replica if RBW
1227     if (replicaInfo.getState() == ReplicaState.RBW) {
1228       finalizeReplica(b.getBlockPoolId(), replicaInfo);
1229     }
1230     return replicaInfo.getStorageUuid();
1231   }
1232 
1233   /**
1234    * Bump a replica's generation stamp to a new one.
1235    * Its on-disk meta file name is renamed to be the new one too.
1236    *
1237    * @param replicaInfo a replica
1238    * @param newGS new generation stamp
1239    * @throws IOException if rename fails
1240    */
bumpReplicaGS(ReplicaInfo replicaInfo, long newGS)1241   private void bumpReplicaGS(ReplicaInfo replicaInfo,
1242       long newGS) throws IOException {
1243     long oldGS = replicaInfo.getGenerationStamp();
1244     File oldmeta = replicaInfo.getMetaFile();
1245     replicaInfo.setGenerationStamp(newGS);
1246     File newmeta = replicaInfo.getMetaFile();
1247 
1248     // rename meta file to new GS
1249     if (LOG.isDebugEnabled()) {
1250       LOG.debug("Renaming " + oldmeta + " to " + newmeta);
1251     }
1252     try {
1253       NativeIO.renameTo(oldmeta, newmeta);
1254     } catch (IOException e) {
1255       replicaInfo.setGenerationStamp(oldGS); // restore old GS
1256       throw new IOException("Block " + replicaInfo + " reopen failed. " +
1257                             " Unable to move meta file  " + oldmeta +
1258                             " to " + newmeta, e);
1259     }
1260   }
1261 
1262   @Override // FsDatasetSpi
createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)1263   public synchronized ReplicaHandler createRbw(
1264       StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
1265       throws IOException {
1266     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
1267         b.getBlockId());
1268     if (replicaInfo != null) {
1269       throw new ReplicaAlreadyExistsException("Block " + b +
1270       " already exists in state " + replicaInfo.getState() +
1271       " and thus cannot be created.");
1272     }
1273     // create a new block
1274     FsVolumeReference ref;
1275     while (true) {
1276       try {
1277         if (allowLazyPersist) {
1278           // First try to place the block on a transient volume.
1279           ref = volumes.getNextTransientVolume(b.getNumBytes());
1280           datanode.getMetrics().incrRamDiskBlocksWrite();
1281         } else {
1282           ref = volumes.getNextVolume(storageType, b.getNumBytes());
1283         }
1284       } catch (DiskOutOfSpaceException de) {
1285         if (allowLazyPersist) {
1286           datanode.getMetrics().incrRamDiskBlocksWriteFallback();
1287           allowLazyPersist = false;
1288           continue;
1289         }
1290         throw de;
1291       }
1292       break;
1293     }
1294     FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
1295     // create an rbw file to hold block in the designated volume
1296     File f;
1297     try {
1298       f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
1299     } catch (IOException e) {
1300       IOUtils.cleanup(null, ref);
1301       throw e;
1302     }
1303 
1304     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
1305         b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
1306     volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
1307     return new ReplicaHandler(newReplicaInfo, ref);
1308   }
1309 
1310   @Override // FsDatasetSpi
recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)1311   public synchronized ReplicaHandler recoverRbw(
1312       ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
1313       throws IOException {
1314     LOG.info("Recover RBW replica " + b);
1315 
1316     ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
1317 
1318     // check the replica's state
1319     if (replicaInfo.getState() != ReplicaState.RBW) {
1320       throw new ReplicaNotFoundException(
1321           ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
1322     }
1323     ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
1324 
1325     LOG.info("Recovering " + rbw);
1326 
1327     // Stop the previous writer
1328     rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
1329     rbw.setWriter(Thread.currentThread());
1330 
1331     // check generation stamp
1332     long replicaGenerationStamp = rbw.getGenerationStamp();
1333     if (replicaGenerationStamp < b.getGenerationStamp() ||
1334         replicaGenerationStamp > newGS) {
1335       throw new ReplicaNotFoundException(
1336           ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
1337           ". Expected GS range is [" + b.getGenerationStamp() + ", " +
1338           newGS + "].");
1339     }
1340 
1341     // check replica length
1342     long bytesAcked = rbw.getBytesAcked();
1343     long numBytes = rbw.getNumBytes();
1344     if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
1345       throw new ReplicaNotFoundException("Unmatched length replica " +
1346           replicaInfo + ": BytesAcked = " + bytesAcked +
1347           " BytesRcvd = " + numBytes + " are not in the range of [" +
1348           minBytesRcvd + ", " + maxBytesRcvd + "].");
1349     }
1350 
1351     FsVolumeReference ref = rbw.getVolume().obtainReference();
1352     try {
1353       // Truncate the potentially corrupt portion.
1354       // If the source was client and the last node in the pipeline was lost,
1355       // any corrupt data written after the acked length can go unnoticed.
1356       if (numBytes > bytesAcked) {
1357         final File replicafile = rbw.getBlockFile();
1358         truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
1359         rbw.setNumBytes(bytesAcked);
1360         rbw.setLastChecksumAndDataLen(bytesAcked, null);
1361       }
1362 
1363       // bump the replica's generation stamp to newGS
1364       bumpReplicaGS(rbw, newGS);
1365     } catch (IOException e) {
1366       IOUtils.cleanup(null, ref);
1367       throw e;
1368     }
1369     return new ReplicaHandler(rbw, ref);
1370   }
1371 
1372   @Override // FsDatasetSpi
convertTemporaryToRbw( final ExtendedBlock b)1373   public synchronized ReplicaInPipeline convertTemporaryToRbw(
1374       final ExtendedBlock b) throws IOException {
1375     final long blockId = b.getBlockId();
1376     final long expectedGs = b.getGenerationStamp();
1377     final long visible = b.getNumBytes();
1378     LOG.info("Convert " + b + " from Temporary to RBW, visible length="
1379         + visible);
1380 
1381     final ReplicaInPipeline temp;
1382     {
1383       // get replica
1384       final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId);
1385       if (r == null) {
1386         throw new ReplicaNotFoundException(
1387             ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
1388       }
1389       // check the replica's state
1390       if (r.getState() != ReplicaState.TEMPORARY) {
1391         throw new ReplicaAlreadyExistsException(
1392             "r.getState() != ReplicaState.TEMPORARY, r=" + r);
1393       }
1394       temp = (ReplicaInPipeline)r;
1395     }
1396     // check generation stamp
1397     if (temp.getGenerationStamp() != expectedGs) {
1398       throw new ReplicaAlreadyExistsException(
1399           "temp.getGenerationStamp() != expectedGs = " + expectedGs
1400           + ", temp=" + temp);
1401     }
1402 
1403     // TODO: check writer?
1404     // set writer to the current thread
1405     // temp.setWriter(Thread.currentThread());
1406 
1407     // check length
1408     final long numBytes = temp.getNumBytes();
1409     if (numBytes < visible) {
1410       throw new IOException(numBytes + " = numBytes < visible = "
1411           + visible + ", temp=" + temp);
1412     }
1413     // check volume
1414     final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume();
1415     if (v == null) {
1416       throw new IOException("r.getVolume() = null, temp="  + temp);
1417     }
1418 
1419     // move block files to the rbw directory
1420     BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
1421     final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(),
1422         bpslice.getRbwDir());
1423     // create RBW
1424     final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
1425         blockId, numBytes, expectedGs,
1426         v, dest.getParentFile(), Thread.currentThread(), 0);
1427     rbw.setBytesAcked(visible);
1428     // overwrite the RBW in the volume map
1429     volumeMap.add(b.getBlockPoolId(), rbw);
1430     return rbw;
1431   }
1432 
1433   @Override // FsDatasetSpi
createTemporary( StorageType storageType, ExtendedBlock b)1434   public ReplicaHandler createTemporary(
1435       StorageType storageType, ExtendedBlock b) throws IOException {
1436     long startTimeMs = Time.monotonicNow();
1437     long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
1438     ReplicaInfo lastFoundReplicaInfo = null;
1439     do {
1440       synchronized (this) {
1441         ReplicaInfo currentReplicaInfo =
1442             volumeMap.get(b.getBlockPoolId(), b.getBlockId());
1443         if (currentReplicaInfo == lastFoundReplicaInfo) {
1444           if (lastFoundReplicaInfo != null) {
1445             invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
1446           }
1447           FsVolumeReference ref =
1448               volumes.getNextVolume(storageType, b.getNumBytes());
1449           FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
1450           // create a temporary file to hold block in the designated volume
1451           File f;
1452           try {
1453             f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
1454           } catch (IOException e) {
1455             IOUtils.cleanup(null, ref);
1456             throw e;
1457           }
1458           ReplicaInPipeline newReplicaInfo =
1459               new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
1460                   f.getParentFile(), 0);
1461           volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
1462           return new ReplicaHandler(newReplicaInfo, ref);
1463         } else {
1464           if (!(currentReplicaInfo.getGenerationStamp() < b
1465               .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
1466             throw new ReplicaAlreadyExistsException("Block " + b
1467                 + " already exists in state " + currentReplicaInfo.getState()
1468                 + " and thus cannot be created.");
1469           }
1470           lastFoundReplicaInfo = currentReplicaInfo;
1471         }
1472       }
1473 
1474       // Hang too long, just bail out. This is not supposed to happen.
1475       long writerStopMs = Time.monotonicNow() - startTimeMs;
1476       if (writerStopMs > writerStopTimeoutMs) {
1477         LOG.warn("Unable to stop existing writer for block " + b + " after "
1478             + writerStopMs + " miniseconds.");
1479         throw new IOException("Unable to stop existing writer for block " + b
1480             + " after " + writerStopMs + " miniseconds.");
1481       }
1482 
1483       // Stop the previous writer
1484       ((ReplicaInPipeline) lastFoundReplicaInfo)
1485           .stopWriter(writerStopTimeoutMs);
1486     } while (true);
1487   }
1488 
1489   /**
1490    * Sets the offset in the meta file so that the
1491    * last checksum will be overwritten.
1492    */
1493   @Override // FsDatasetSpi
adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, int checksumSize)1494   public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams,
1495       int checksumSize) throws IOException {
1496     FileOutputStream file = (FileOutputStream)streams.getChecksumOut();
1497     FileChannel channel = file.getChannel();
1498     long oldPos = channel.position();
1499     long newPos = oldPos - checksumSize;
1500     if (LOG.isDebugEnabled()) {
1501       LOG.debug("Changing meta file offset of block " + b + " from " +
1502           oldPos + " to " + newPos);
1503     }
1504     channel.position(newPos);
1505   }
1506 
1507   //
1508   // REMIND - mjc - eventually we should have a timeout system
1509   // in place to clean up block files left by abandoned clients.
1510   // We should have some timer in place, so that if a blockfile
1511   // is created but non-valid, and has been idle for >48 hours,
1512   // we can GC it safely.
1513   //
1514 
1515   /**
1516    * Complete the block write!
1517    */
1518   @Override // FsDatasetSpi
finalizeBlock(ExtendedBlock b)1519   public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
1520     if (Thread.interrupted()) {
1521       // Don't allow data modifications from interrupted threads
1522       throw new IOException("Cannot finalize block from Interrupted Thread");
1523     }
1524     ReplicaInfo replicaInfo = getReplicaInfo(b);
1525     if (replicaInfo.getState() == ReplicaState.FINALIZED) {
1526       // this is legal, when recovery happens on a file that has
1527       // been opened for append but never modified
1528       return;
1529     }
1530     finalizeReplica(b.getBlockPoolId(), replicaInfo);
1531   }
1532 
finalizeReplica(String bpid, ReplicaInfo replicaInfo)1533   private synchronized FinalizedReplica finalizeReplica(String bpid,
1534       ReplicaInfo replicaInfo) throws IOException {
1535     FinalizedReplica newReplicaInfo = null;
1536     if (replicaInfo.getState() == ReplicaState.RUR &&
1537        ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() ==
1538          ReplicaState.FINALIZED) {
1539       newReplicaInfo = (FinalizedReplica)
1540              ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
1541     } else {
1542       FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
1543       File f = replicaInfo.getBlockFile();
1544       if (v == null) {
1545         throw new IOException("No volume for temporary file " + f +
1546             " for block " + replicaInfo);
1547       }
1548 
1549       File dest = v.addFinalizedBlock(
1550           bpid, replicaInfo, f, replicaInfo.getBytesReserved());
1551       newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
1552 
1553       if (v.isTransientStorage()) {
1554         ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
1555         datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
1556       }
1557     }
1558     volumeMap.add(bpid, newReplicaInfo);
1559 
1560     return newReplicaInfo;
1561   }
1562 
1563   /**
1564    * Remove the temporary block file (if any)
1565    */
1566   @Override // FsDatasetSpi
unfinalizeBlock(ExtendedBlock b)1567   public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
1568     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
1569         b.getLocalBlock());
1570     if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
1571       // remove from volumeMap
1572       volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
1573 
1574       // delete the on-disk temp file
1575       if (delBlockFromDisk(replicaInfo.getBlockFile(),
1576           replicaInfo.getMetaFile(), b.getLocalBlock())) {
1577         LOG.warn("Block " + b + " unfinalized and removed. " );
1578       }
1579       if (replicaInfo.getVolume().isTransientStorage()) {
1580         ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
1581       }
1582     }
1583   }
1584 
1585   /**
1586    * Remove a block from disk
1587    * @param blockFile block file
1588    * @param metaFile block meta file
1589    * @param b a block
1590    * @return true if on-disk files are deleted; false otherwise
1591    */
delBlockFromDisk(File blockFile, File metaFile, Block b)1592   private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
1593     if (blockFile == null) {
1594       LOG.warn("No file exists for block: " + b);
1595       return true;
1596     }
1597 
1598     if (!blockFile.delete()) {
1599       LOG.warn("Not able to delete the block file: " + blockFile);
1600       return false;
1601     } else { // remove the meta file
1602       if (metaFile != null && !metaFile.delete()) {
1603         LOG.warn("Not able to delete the meta block file: " + metaFile);
1604         return false;
1605       }
1606     }
1607     return true;
1608   }
1609 
1610   @Override
getBlockReports(String bpid)1611   public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
1612     Map<DatanodeStorage, BlockListAsLongs> blockReportsMap =
1613         new HashMap<DatanodeStorage, BlockListAsLongs>();
1614 
1615     Map<String, BlockListAsLongs.Builder> builders =
1616         new HashMap<String, BlockListAsLongs.Builder>();
1617 
1618     List<FsVolumeImpl> curVolumes = getVolumes();
1619     for (FsVolumeSpi v : curVolumes) {
1620       builders.put(v.getStorageID(), BlockListAsLongs.builder());
1621     }
1622 
1623     synchronized(this) {
1624       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
1625         switch(b.getState()) {
1626           case FINALIZED:
1627           case RBW:
1628           case RWR:
1629             builders.get(b.getVolume().getStorageID()).add(b);
1630             break;
1631           case RUR:
1632             ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
1633             builders.get(rur.getVolume().getStorageID())
1634                 .add(rur.getOriginalReplica());
1635             break;
1636           case TEMPORARY:
1637             break;
1638           default:
1639             assert false : "Illegal ReplicaInfo state.";
1640         }
1641       }
1642     }
1643 
1644     for (FsVolumeImpl v : curVolumes) {
1645       blockReportsMap.put(v.toDatanodeStorage(),
1646                           builders.get(v.getStorageID()).build());
1647     }
1648 
1649     return blockReportsMap;
1650   }
1651 
1652   @Override // FsDatasetSpi
getCacheReport(String bpid)1653   public List<Long> getCacheReport(String bpid) {
1654     return cacheManager.getCachedBlocks(bpid);
1655   }
1656 
1657   /**
1658    * Get the list of finalized blocks from in-memory blockmap for a block pool.
1659    */
1660   @Override
getFinalizedBlocks(String bpid)1661   public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) {
1662     ArrayList<FinalizedReplica> finalized =
1663         new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
1664     for (ReplicaInfo b : volumeMap.replicas(bpid)) {
1665       if(b.getState() == ReplicaState.FINALIZED) {
1666         finalized.add(new FinalizedReplica((FinalizedReplica)b));
1667       }
1668     }
1669     return finalized;
1670   }
1671 
1672   /**
1673    * Get the list of finalized blocks from in-memory blockmap for a block pool.
1674    */
1675   @Override
getFinalizedBlocksOnPersistentStorage(String bpid)1676   public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
1677     ArrayList<FinalizedReplica> finalized =
1678         new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
1679     for (ReplicaInfo b : volumeMap.replicas(bpid)) {
1680       if(!b.getVolume().isTransientStorage() &&
1681          b.getState() == ReplicaState.FINALIZED) {
1682         finalized.add(new FinalizedReplica((FinalizedReplica)b));
1683       }
1684     }
1685     return finalized;
1686   }
1687 
1688   /**
1689    * Check if a block is valid.
1690    *
1691    * @param b           The block to check.
1692    * @param minLength   The minimum length that the block must have.  May be 0.
1693    * @param state       If this is null, it is ignored.  If it is non-null, we
1694    *                        will check that the replica has this state.
1695    *
1696    * @throws ReplicaNotFoundException          If the replica is not found
1697    *
1698    * @throws UnexpectedReplicaStateException   If the replica is not in the
1699    *                                             expected state.
1700    * @throws FileNotFoundException             If the block file is not found or there
1701    *                                              was an error locating it.
1702    * @throws EOFException                      If the replica length is too short.
1703    *
1704    * @throws IOException                       May be thrown from the methods called.
1705    */
checkBlock(ExtendedBlock b, long minLength, ReplicaState state)1706   public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
1707       throws ReplicaNotFoundException, UnexpectedReplicaStateException,
1708       FileNotFoundException, EOFException, IOException {
1709     final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
1710         b.getLocalBlock());
1711     if (replicaInfo == null) {
1712       throw new ReplicaNotFoundException(b);
1713     }
1714     if (replicaInfo.getState() != state) {
1715       throw new UnexpectedReplicaStateException(b,state);
1716     }
1717     if (!replicaInfo.getBlockFile().exists()) {
1718       throw new FileNotFoundException(replicaInfo.getBlockFile().getPath());
1719     }
1720     long onDiskLength = getLength(b);
1721     if (onDiskLength < minLength) {
1722       throw new EOFException(b + "'s on-disk length " + onDiskLength
1723           + " is shorter than minLength " + minLength);
1724     }
1725   }
1726 
1727   /**
1728    * Check whether the given block is a valid one.
1729    * valid means finalized
1730    */
1731   @Override // FsDatasetSpi
isValidBlock(ExtendedBlock b)1732   public boolean isValidBlock(ExtendedBlock b) {
1733     return isValid(b, ReplicaState.FINALIZED);
1734   }
1735 
1736   /**
1737    * Check whether the given block is a valid RBW.
1738    */
1739   @Override // {@link FsDatasetSpi}
isValidRbw(final ExtendedBlock b)1740   public boolean isValidRbw(final ExtendedBlock b) {
1741     return isValid(b, ReplicaState.RBW);
1742   }
1743 
1744   /** Does the block exist and have the given state? */
isValid(final ExtendedBlock b, final ReplicaState state)1745   private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
1746     try {
1747       checkBlock(b, 0, state);
1748     } catch (IOException e) {
1749       return false;
1750     }
1751     return true;
1752   }
1753 
1754   /**
1755    * Find the file corresponding to the block and return it if it exists.
1756    */
validateBlockFile(String bpid, long blockId)1757   File validateBlockFile(String bpid, long blockId) {
1758     //Should we check for metadata file too?
1759     final File f;
1760     synchronized(this) {
1761       f = getFile(bpid, blockId, false);
1762     }
1763 
1764     if(f != null ) {
1765       if(f.exists())
1766         return f;
1767 
1768       // if file is not null, but doesn't exist - possibly disk failed
1769       datanode.checkDiskErrorAsync();
1770     }
1771 
1772     if (LOG.isDebugEnabled()) {
1773       LOG.debug("blockId=" + blockId + ", f=" + f);
1774     }
1775     return null;
1776   }
1777 
1778   /** Check the files of a replica. */
checkReplicaFiles(final ReplicaInfo r)1779   static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
1780     //check replica's file
1781     final File f = r.getBlockFile();
1782     if (!f.exists()) {
1783       throw new FileNotFoundException("File " + f + " not found, r=" + r);
1784     }
1785     if (r.getBytesOnDisk() != f.length()) {
1786       throw new IOException("File length mismatched.  The length of "
1787           + f + " is " + f.length() + " but r=" + r);
1788     }
1789 
1790     //check replica's meta file
1791     final File metafile = FsDatasetUtil.getMetaFile(f, r.getGenerationStamp());
1792     if (!metafile.exists()) {
1793       throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
1794     }
1795     if (metafile.length() == 0) {
1796       throw new IOException("Metafile " + metafile + " is empty, r=" + r);
1797     }
1798   }
1799 
1800   /**
1801    * We're informed that a block is no longer valid.  We
1802    * could lazily garbage-collect the block, but why bother?
1803    * just get rid of it.
1804    */
1805   @Override // FsDatasetSpi
invalidate(String bpid, Block invalidBlks[])1806   public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
1807     final List<String> errors = new ArrayList<String>();
1808     for (int i = 0; i < invalidBlks.length; i++) {
1809       final File f;
1810       final FsVolumeImpl v;
1811       synchronized (this) {
1812         final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
1813         if (info == null) {
1814           // It is okay if the block is not found -- it may be deleted earlier.
1815           LOG.info("Failed to delete replica " + invalidBlks[i]
1816               + ": ReplicaInfo not found.");
1817           continue;
1818         }
1819         if (info.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
1820           errors.add("Failed to delete replica " + invalidBlks[i]
1821               + ": GenerationStamp not matched, info=" + info);
1822           continue;
1823         }
1824         f = info.getBlockFile();
1825         v = (FsVolumeImpl)info.getVolume();
1826         if (v == null) {
1827           errors.add("Failed to delete replica " + invalidBlks[i]
1828               +  ". No volume for this replica, file=" + f);
1829           continue;
1830         }
1831         File parent = f.getParentFile();
1832         if (parent == null) {
1833           errors.add("Failed to delete replica " + invalidBlks[i]
1834               +  ". Parent not found for file " + f);
1835           continue;
1836         }
1837         ReplicaInfo removing = volumeMap.remove(bpid, invalidBlks[i]);
1838         addDeletingBlock(bpid, removing.getBlockId());
1839         if (LOG.isDebugEnabled()) {
1840           LOG.debug("Block file " + removing.getBlockFile().getName()
1841               + " is to be deleted");
1842         }
1843       }
1844 
1845       if (v.isTransientStorage()) {
1846         RamDiskReplica replicaInfo =
1847           ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
1848         if (replicaInfo != null) {
1849           if (!replicaInfo.getIsPersisted()) {
1850             datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
1851           }
1852           ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(),
1853             replicaInfo.getBlockId(), true);
1854         }
1855       }
1856 
1857       // If a DFSClient has the replica in its cache of short-circuit file
1858       // descriptors (and the client is using ShortCircuitShm), invalidate it.
1859       datanode.getShortCircuitRegistry().processBlockInvalidation(
1860                 new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid));
1861 
1862       // If the block is cached, start uncaching it.
1863       cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
1864 
1865       // Delete the block asynchronously to make sure we can do it fast enough.
1866       // It's ok to unlink the block file before the uncache operation
1867       // finishes.
1868       try {
1869         asyncDiskService.deleteAsync(v.obtainReference(), f,
1870             FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
1871             new ExtendedBlock(bpid, invalidBlks[i]),
1872             dataStorage.getTrashDirectoryForBlockFile(bpid, f));
1873       } catch (ClosedChannelException e) {
1874         LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
1875             "block " + invalidBlks[i]);
1876       }
1877     }
1878     if (!errors.isEmpty()) {
1879       StringBuilder b = new StringBuilder("Failed to delete ")
1880         .append(errors.size()).append(" (out of ").append(invalidBlks.length)
1881         .append(") replica(s):");
1882       for(int i = 0; i < errors.size(); i++) {
1883         b.append("\n").append(i).append(") ").append(errors.get(i));
1884       }
1885       throw new IOException(b.toString());
1886     }
1887   }
1888 
1889   /**
1890    * Invalidate a block but does not delete the actual on-disk block file.
1891    *
1892    * It should only be used when deactivating disks.
1893    *
1894    * @param bpid the block pool ID.
1895    * @param block The block to be invalidated.
1896    */
invalidate(String bpid, ReplicaInfo block)1897   public void invalidate(String bpid, ReplicaInfo block) {
1898     // If a DFSClient has the replica in its cache of short-circuit file
1899     // descriptors (and the client is using ShortCircuitShm), invalidate it.
1900     datanode.getShortCircuitRegistry().processBlockInvalidation(
1901         new ExtendedBlockId(block.getBlockId(), bpid));
1902 
1903     // If the block is cached, start uncaching it.
1904     cacheManager.uncacheBlock(bpid, block.getBlockId());
1905 
1906     datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
1907         block.getStorageUuid());
1908   }
1909 
1910   /**
1911    * Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
1912    */
cacheBlock(String bpid, long blockId)1913   private void cacheBlock(String bpid, long blockId) {
1914     FsVolumeImpl volume;
1915     String blockFileName;
1916     long length, genstamp;
1917     Executor volumeExecutor;
1918 
1919     synchronized (this) {
1920       ReplicaInfo info = volumeMap.get(bpid, blockId);
1921       boolean success = false;
1922       try {
1923         if (info == null) {
1924           LOG.warn("Failed to cache block with id " + blockId + ", pool " +
1925               bpid + ": ReplicaInfo not found.");
1926           return;
1927         }
1928         if (info.getState() != ReplicaState.FINALIZED) {
1929           LOG.warn("Failed to cache block with id " + blockId + ", pool " +
1930               bpid + ": replica is not finalized; it is in state " +
1931               info.getState());
1932           return;
1933         }
1934         try {
1935           volume = (FsVolumeImpl)info.getVolume();
1936           if (volume == null) {
1937             LOG.warn("Failed to cache block with id " + blockId + ", pool " +
1938                 bpid + ": volume not found.");
1939             return;
1940           }
1941         } catch (ClassCastException e) {
1942           LOG.warn("Failed to cache block with id " + blockId +
1943               ": volume was not an instance of FsVolumeImpl.");
1944           return;
1945         }
1946         if (volume.isTransientStorage()) {
1947           LOG.warn("Caching not supported on block with id " + blockId +
1948               " since the volume is backed by RAM.");
1949           return;
1950         }
1951         success = true;
1952       } finally {
1953         if (!success) {
1954           cacheManager.numBlocksFailedToCache.incrementAndGet();
1955         }
1956       }
1957       blockFileName = info.getBlockFile().getAbsolutePath();
1958       length = info.getVisibleLength();
1959       genstamp = info.getGenerationStamp();
1960       volumeExecutor = volume.getCacheExecutor();
1961     }
1962     cacheManager.cacheBlock(blockId, bpid,
1963         blockFileName, length, genstamp, volumeExecutor);
1964   }
1965 
1966   @Override // FsDatasetSpi
cache(String bpid, long[] blockIds)1967   public void cache(String bpid, long[] blockIds) {
1968     for (int i=0; i < blockIds.length; i++) {
1969       cacheBlock(bpid, blockIds[i]);
1970     }
1971   }
1972 
1973   @Override // FsDatasetSpi
uncache(String bpid, long[] blockIds)1974   public void uncache(String bpid, long[] blockIds) {
1975     for (int i=0; i < blockIds.length; i++) {
1976       cacheManager.uncacheBlock(bpid, blockIds[i]);
1977     }
1978   }
1979 
1980   @Override
isCached(String bpid, long blockId)1981   public boolean isCached(String bpid, long blockId) {
1982     return cacheManager.isCached(bpid, blockId);
1983   }
1984 
1985   @Override // FsDatasetSpi
contains(final ExtendedBlock block)1986   public synchronized boolean contains(final ExtendedBlock block) {
1987     final long blockId = block.getLocalBlock().getBlockId();
1988     return getFile(block.getBlockPoolId(), blockId, false) != null;
1989   }
1990 
1991   /**
1992    * Turn the block identifier into a filename
1993    * @param bpid Block pool Id
1994    * @param blockId a block's id
1995    * @return on disk data file path; null if the replica does not exist
1996    */
getFile(final String bpid, final long blockId, boolean touch)1997   File getFile(final String bpid, final long blockId, boolean touch) {
1998     ReplicaInfo info = volumeMap.get(bpid, blockId);
1999     if (info != null) {
2000       if (touch && info.getVolume().isTransientStorage()) {
2001         ramDiskReplicaTracker.touch(bpid, blockId);
2002         datanode.getMetrics().incrRamDiskBlocksReadHits();
2003       }
2004       return info.getBlockFile();
2005     }
2006     return null;
2007   }
2008 
2009   /**
2010    * check if a data directory is healthy
2011    *
2012    * if some volumes failed - the caller must emove all the blocks that belong
2013    * to these failed volumes.
2014    * @return the failed volumes. Returns null if no volume failed.
2015    */
2016   @Override // FsDatasetSpi
checkDataDir()2017   public Set<File> checkDataDir() {
2018    return volumes.checkDirs();
2019   }
2020 
2021 
2022   @Override // FsDatasetSpi
toString()2023   public String toString() {
2024     return "FSDataset{dirpath='"+volumes+"'}";
2025   }
2026 
2027   private ObjectName mbeanName;
2028 
2029   /**
2030    * Register the FSDataset MBean using the name
2031    *        "hadoop:service=DataNode,name=FSDatasetState-<datanodeUuid>"
2032    */
registerMBean(final String datanodeUuid)2033   void registerMBean(final String datanodeUuid) {
2034     // We wrap to bypass standard mbean naming convetion.
2035     // This wraping can be removed in java 6 as it is more flexible in
2036     // package naming for mbeans and their impl.
2037     try {
2038       StandardMBean bean = new StandardMBean(this,FSDatasetMBean.class);
2039       mbeanName = MBeans.register("DataNode", "FSDatasetState-" + datanodeUuid, bean);
2040     } catch (NotCompliantMBeanException e) {
2041       LOG.warn("Error registering FSDatasetState MBean", e);
2042     }
2043     LOG.info("Registered FSDatasetState MBean");
2044   }
2045 
2046   @Override // FsDatasetSpi
shutdown()2047   public void shutdown() {
2048     fsRunning = false;
2049 
2050     ((LazyWriter) lazyWriter.getRunnable()).stop();
2051     lazyWriter.interrupt();
2052 
2053     if (mbeanName != null) {
2054       MBeans.unregister(mbeanName);
2055     }
2056 
2057     if (asyncDiskService != null) {
2058       asyncDiskService.shutdown();
2059     }
2060 
2061     if (asyncLazyPersistService != null) {
2062       asyncLazyPersistService.shutdown();
2063     }
2064 
2065     if(volumes != null) {
2066       volumes.shutdown();
2067     }
2068 
2069     try {
2070       lazyWriter.join();
2071     } catch (InterruptedException ie) {
2072       LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
2073                "from LazyWriter.join");
2074     }
2075   }
2076 
2077   @Override // FSDatasetMBean
getStorageInfo()2078   public String getStorageInfo() {
2079     return toString();
2080   }
2081 
2082   /**
2083    * Reconcile the difference between blocks on the disk and blocks in
2084    * volumeMap
2085    *
2086    * Check the given block for inconsistencies. Look at the
2087    * current state of the block and reconcile the differences as follows:
2088    * <ul>
2089    * <li>If the block file is missing, delete the block from volumeMap</li>
2090    * <li>If the block file exists and the block is missing in volumeMap,
2091    * add the block to volumeMap <li>
2092    * <li>If generation stamp does not match, then update the block with right
2093    * generation stamp</li>
2094    * <li>If the block length in memory does not match the actual block file length
2095    * then mark the block as corrupt and update the block length in memory</li>
2096    * <li>If the file in {@link ReplicaInfo} does not match the file on
2097    * the disk, update {@link ReplicaInfo} with the correct file</li>
2098    * </ul>
2099    *
2100    * @param blockId Block that differs
2101    * @param diskFile Block file on the disk
2102    * @param diskMetaFile Metadata file from on the disk
2103    * @param vol Volume of the block file
2104    */
2105   @Override
checkAndUpdate(String bpid, long blockId, File diskFile, File diskMetaFile, FsVolumeSpi vol)2106   public void checkAndUpdate(String bpid, long blockId, File diskFile,
2107       File diskMetaFile, FsVolumeSpi vol) throws IOException {
2108     Block corruptBlock = null;
2109     ReplicaInfo memBlockInfo;
2110     synchronized (this) {
2111       memBlockInfo = volumeMap.get(bpid, blockId);
2112       if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
2113         // Block is not finalized - ignore the difference
2114         return;
2115       }
2116 
2117       final long diskGS = diskMetaFile != null && diskMetaFile.exists() ?
2118           Block.getGenerationStamp(diskMetaFile.getName()) :
2119             GenerationStamp.GRANDFATHER_GENERATION_STAMP;
2120 
2121       if (diskFile == null || !diskFile.exists()) {
2122         if (memBlockInfo == null) {
2123           // Block file does not exist and block does not exist in memory
2124           // If metadata file exists then delete it
2125           if (diskMetaFile != null && diskMetaFile.exists()
2126               && diskMetaFile.delete()) {
2127             LOG.warn("Deleted a metadata file without a block "
2128                 + diskMetaFile.getAbsolutePath());
2129           }
2130           return;
2131         }
2132         if (!memBlockInfo.getBlockFile().exists()) {
2133           // Block is in memory and not on the disk
2134           // Remove the block from volumeMap
2135           volumeMap.remove(bpid, blockId);
2136           if (vol.isTransientStorage()) {
2137             ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
2138           }
2139           LOG.warn("Removed block " + blockId
2140               + " from memory with missing block file on the disk");
2141           // Finally remove the metadata file
2142           if (diskMetaFile != null && diskMetaFile.exists()
2143               && diskMetaFile.delete()) {
2144             LOG.warn("Deleted a metadata file for the deleted block "
2145                 + diskMetaFile.getAbsolutePath());
2146           }
2147         }
2148         return;
2149       }
2150       /*
2151        * Block file exists on the disk
2152        */
2153       if (memBlockInfo == null) {
2154         // Block is missing in memory - add the block to volumeMap
2155         ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
2156             diskFile.length(), diskGS, vol, diskFile.getParentFile());
2157         volumeMap.add(bpid, diskBlockInfo);
2158         if (vol.isTransientStorage()) {
2159           ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
2160         }
2161         LOG.warn("Added missing block to memory " + diskBlockInfo);
2162         return;
2163       }
2164       /*
2165        * Block exists in volumeMap and the block file exists on the disk
2166        */
2167       // Compare block files
2168       File memFile = memBlockInfo.getBlockFile();
2169       if (memFile.exists()) {
2170         if (memFile.compareTo(diskFile) != 0) {
2171           if (diskMetaFile.exists()) {
2172             if (memBlockInfo.getMetaFile().exists()) {
2173               // We have two sets of block+meta files. Decide which one to
2174               // keep.
2175               ReplicaInfo diskBlockInfo = new FinalizedReplica(
2176                   blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile());
2177               ((FsVolumeImpl) vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas(
2178                   memBlockInfo, diskBlockInfo, volumeMap);
2179             }
2180           } else {
2181             if (!diskFile.delete()) {
2182               LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan");
2183             }
2184           }
2185         }
2186       } else {
2187         // Block refers to a block file that does not exist.
2188         // Update the block with the file found on the disk. Since the block
2189         // file and metadata file are found as a pair on the disk, update
2190         // the block based on the metadata file found on the disk
2191         LOG.warn("Block file in volumeMap "
2192             + memFile.getAbsolutePath()
2193             + " does not exist. Updating it to the file found during scan "
2194             + diskFile.getAbsolutePath());
2195         memBlockInfo.setDir(diskFile.getParentFile());
2196         memFile = diskFile;
2197 
2198         LOG.warn("Updating generation stamp for block " + blockId
2199             + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
2200         memBlockInfo.setGenerationStamp(diskGS);
2201       }
2202 
2203       // Compare generation stamp
2204       if (memBlockInfo.getGenerationStamp() != diskGS) {
2205         File memMetaFile = FsDatasetUtil.getMetaFile(diskFile,
2206             memBlockInfo.getGenerationStamp());
2207         if (memMetaFile.exists()) {
2208           if (memMetaFile.compareTo(diskMetaFile) != 0) {
2209             LOG.warn("Metadata file in memory "
2210                 + memMetaFile.getAbsolutePath()
2211                 + " does not match file found by scan "
2212                 + (diskMetaFile == null? null: diskMetaFile.getAbsolutePath()));
2213           }
2214         } else {
2215           // Metadata file corresponding to block in memory is missing
2216           // If metadata file found during the scan is on the same directory
2217           // as the block file, then use the generation stamp from it
2218           long gs = diskMetaFile != null && diskMetaFile.exists()
2219               && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
2220               : GenerationStamp.GRANDFATHER_GENERATION_STAMP;
2221 
2222           LOG.warn("Updating generation stamp for block " + blockId
2223               + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
2224 
2225           memBlockInfo.setGenerationStamp(gs);
2226         }
2227       }
2228 
2229       // Compare block size
2230       if (memBlockInfo.getNumBytes() != memFile.length()) {
2231         // Update the length based on the block file
2232         corruptBlock = new Block(memBlockInfo);
2233         LOG.warn("Updating size of block " + blockId + " from "
2234             + memBlockInfo.getNumBytes() + " to " + memFile.length());
2235         memBlockInfo.setNumBytes(memFile.length());
2236       }
2237     }
2238 
2239     // Send corrupt block report outside the lock
2240     if (corruptBlock != null) {
2241       LOG.warn("Reporting the block " + corruptBlock
2242           + " as corrupt due to length mismatch");
2243       try {
2244         datanode.reportBadBlocks(new ExtendedBlock(bpid, corruptBlock));
2245       } catch (IOException e) {
2246         LOG.warn("Failed to repot bad block " + corruptBlock, e);
2247       }
2248     }
2249   }
2250 
2251   /**
2252    * @deprecated use {@link #fetchReplicaInfo(String, long)} instead.
2253    */
2254   @Override // FsDatasetSpi
2255   @Deprecated
getReplica(String bpid, long blockId)2256   public ReplicaInfo getReplica(String bpid, long blockId) {
2257     return volumeMap.get(bpid, blockId);
2258   }
2259 
2260   @Override
getReplicaString(String bpid, long blockId)2261   public synchronized String getReplicaString(String bpid, long blockId) {
2262     final Replica r = volumeMap.get(bpid, blockId);
2263     return r == null? "null": r.toString();
2264   }
2265 
2266   @Override // FsDatasetSpi
initReplicaRecovery( RecoveringBlock rBlock)2267   public synchronized ReplicaRecoveryInfo initReplicaRecovery(
2268       RecoveringBlock rBlock) throws IOException {
2269     return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(), volumeMap,
2270         rBlock.getBlock().getLocalBlock(), rBlock.getNewGenerationStamp(),
2271         datanode.getDnConf().getXceiverStopTimeout());
2272   }
2273 
2274   /** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */
initReplicaRecovery(String bpid, ReplicaMap map, Block block, long recoveryId, long xceiverStopTimeout)2275   static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
2276       Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
2277     final ReplicaInfo replica = map.get(bpid, block.getBlockId());
2278     LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
2279         + ", replica=" + replica);
2280 
2281     //check replica
2282     if (replica == null) {
2283       return null;
2284     }
2285 
2286     //stop writer if there is any
2287     if (replica instanceof ReplicaInPipeline) {
2288       final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
2289       rip.stopWriter(xceiverStopTimeout);
2290 
2291       //check replica bytes on disk.
2292       if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
2293         throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
2294             + " getBytesOnDisk() < getVisibleLength(), rip=" + rip);
2295       }
2296 
2297       //check the replica's files
2298       checkReplicaFiles(rip);
2299     }
2300 
2301     //check generation stamp
2302     if (replica.getGenerationStamp() < block.getGenerationStamp()) {
2303       throw new IOException(
2304           "replica.getGenerationStamp() < block.getGenerationStamp(), block="
2305           + block + ", replica=" + replica);
2306     }
2307 
2308     //check recovery id
2309     if (replica.getGenerationStamp() >= recoveryId) {
2310       throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
2311           + " replica.getGenerationStamp() >= recoveryId = " + recoveryId
2312           + ", block=" + block + ", replica=" + replica);
2313     }
2314 
2315     //check RUR
2316     final ReplicaUnderRecovery rur;
2317     if (replica.getState() == ReplicaState.RUR) {
2318       rur = (ReplicaUnderRecovery)replica;
2319       if (rur.getRecoveryID() >= recoveryId) {
2320         throw new RecoveryInProgressException(
2321             "rur.getRecoveryID() >= recoveryId = " + recoveryId
2322             + ", block=" + block + ", rur=" + rur);
2323       }
2324       final long oldRecoveryID = rur.getRecoveryID();
2325       rur.setRecoveryID(recoveryId);
2326       LOG.info("initReplicaRecovery: update recovery id for " + block
2327           + " from " + oldRecoveryID + " to " + recoveryId);
2328     }
2329     else {
2330       rur = new ReplicaUnderRecovery(replica, recoveryId);
2331       map.add(bpid, rur);
2332       LOG.info("initReplicaRecovery: changing replica state for "
2333           + block + " from " + replica.getState()
2334           + " to " + rur.getState());
2335     }
2336     return rur.createInfo();
2337   }
2338 
2339   @Override // FsDatasetSpi
updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, final long newBlockId, final long newlength)2340   public synchronized String updateReplicaUnderRecovery(
2341                                     final ExtendedBlock oldBlock,
2342                                     final long recoveryId,
2343                                     final long newBlockId,
2344                                     final long newlength) throws IOException {
2345     //get replica
2346     final String bpid = oldBlock.getBlockPoolId();
2347     final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
2348     LOG.info("updateReplica: " + oldBlock
2349                  + ", recoveryId=" + recoveryId
2350                  + ", length=" + newlength
2351                  + ", replica=" + replica);
2352 
2353     //check replica
2354     if (replica == null) {
2355       throw new ReplicaNotFoundException(oldBlock);
2356     }
2357 
2358     //check replica state
2359     if (replica.getState() != ReplicaState.RUR) {
2360       throw new IOException("replica.getState() != " + ReplicaState.RUR
2361           + ", replica=" + replica);
2362     }
2363 
2364     //check replica's byte on disk
2365     if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
2366       throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
2367           + " replica.getBytesOnDisk() != block.getNumBytes(), block="
2368           + oldBlock + ", replica=" + replica);
2369     }
2370 
2371     //check replica files before update
2372     checkReplicaFiles(replica);
2373 
2374     //update replica
2375     final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
2376         .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
2377         newBlockId, newlength);
2378 
2379     boolean copyTruncate = newBlockId != oldBlock.getBlockId();
2380     if(!copyTruncate) {
2381       assert finalized.getBlockId() == oldBlock.getBlockId()
2382           && finalized.getGenerationStamp() == recoveryId
2383           && finalized.getNumBytes() == newlength
2384           : "Replica information mismatched: oldBlock=" + oldBlock
2385               + ", recoveryId=" + recoveryId + ", newlength=" + newlength
2386               + ", newBlockId=" + newBlockId + ", finalized=" + finalized;
2387     } else {
2388       assert finalized.getBlockId() == oldBlock.getBlockId()
2389           && finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
2390           && finalized.getNumBytes() == oldBlock.getNumBytes()
2391           : "Finalized and old information mismatched: oldBlock=" + oldBlock
2392               + ", genStamp=" + oldBlock.getGenerationStamp()
2393               + ", len=" + oldBlock.getNumBytes()
2394               + ", finalized=" + finalized;
2395     }
2396 
2397     //check replica files after update
2398     checkReplicaFiles(finalized);
2399 
2400     //return storage ID
2401     return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID();
2402   }
2403 
updateReplicaUnderRecovery( String bpid, ReplicaUnderRecovery rur, long recoveryId, long newBlockId, long newlength)2404   private FinalizedReplica updateReplicaUnderRecovery(
2405                                           String bpid,
2406                                           ReplicaUnderRecovery rur,
2407                                           long recoveryId,
2408                                           long newBlockId,
2409                                           long newlength) throws IOException {
2410     //check recovery id
2411     if (rur.getRecoveryID() != recoveryId) {
2412       throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId
2413           + ", rur=" + rur);
2414     }
2415 
2416     boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId;
2417     File blockFile;
2418     File metaFile;
2419     // bump rur's GS to be recovery id
2420     if(!copyOnTruncate) {
2421       bumpReplicaGS(rur, recoveryId);
2422       blockFile = rur.getBlockFile();
2423       metaFile = rur.getMetaFile();
2424     } else {
2425       File[] copiedReplicaFiles =
2426           copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
2427       blockFile = copiedReplicaFiles[1];
2428       metaFile = copiedReplicaFiles[0];
2429     }
2430 
2431     //update length
2432     if (rur.getNumBytes() < newlength) {
2433       throw new IOException("rur.getNumBytes() < newlength = " + newlength
2434           + ", rur=" + rur);
2435     }
2436     if (rur.getNumBytes() > newlength) {
2437       rur.unlinkBlock(1);
2438       truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
2439       if(!copyOnTruncate) {
2440         // update RUR with the new length
2441         rur.setNumBytes(newlength);
2442       } else {
2443         // Copying block to a new block with new blockId.
2444         // Not truncating original block.
2445         ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
2446             newBlockId, recoveryId, rur.getVolume(), blockFile.getParentFile(),
2447             newlength);
2448         newReplicaInfo.setNumBytes(newlength);
2449         volumeMap.add(bpid, newReplicaInfo);
2450         finalizeReplica(bpid, newReplicaInfo);
2451       }
2452    }
2453 
2454     // finalize the block
2455     return finalizeReplica(bpid, rur);
2456   }
2457 
copyReplicaWithNewBlockIdAndGS( ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS)2458   private File[] copyReplicaWithNewBlockIdAndGS(
2459       ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS)
2460       throws IOException {
2461     String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId;
2462     FsVolumeReference v = volumes.getNextVolume(
2463         replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes());
2464     final File tmpDir = ((FsVolumeImpl) v.getVolume())
2465         .getBlockPoolSlice(bpid).getTmpDir();
2466     final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId);
2467     final File dstBlockFile = new File(destDir, blockFileName);
2468     final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
2469     return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
2470         dstMetaFile, dstBlockFile, true);
2471   }
2472 
2473   @Override // FsDatasetSpi
getReplicaVisibleLength(final ExtendedBlock block)2474   public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
2475   throws IOException {
2476     final Replica replica = getReplicaInfo(block.getBlockPoolId(),
2477         block.getBlockId());
2478     if (replica.getGenerationStamp() < block.getGenerationStamp()) {
2479       throw new IOException(
2480           "replica.getGenerationStamp() < block.getGenerationStamp(), block="
2481           + block + ", replica=" + replica);
2482     }
2483     return replica.getVisibleLength();
2484   }
2485 
2486   @Override
addBlockPool(String bpid, Configuration conf)2487   public void addBlockPool(String bpid, Configuration conf)
2488       throws IOException {
2489     LOG.info("Adding block pool " + bpid);
2490     synchronized(this) {
2491       volumes.addBlockPool(bpid, conf);
2492       volumeMap.initBlockPool(bpid);
2493     }
2494     volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
2495   }
2496 
2497   @Override
shutdownBlockPool(String bpid)2498   public synchronized void shutdownBlockPool(String bpid) {
2499     LOG.info("Removing block pool " + bpid);
2500     volumeMap.cleanUpBlockPool(bpid);
2501     volumes.removeBlockPool(bpid);
2502   }
2503 
2504   /**
2505    * Class for representing the Datanode volume information
2506    */
2507   private static class VolumeInfo {
2508     final String directory;
2509     final long usedSpace; // size of space used by HDFS
2510     final long freeSpace; // size of free space excluding reserved space
2511     final long reservedSpace; // size of space reserved for non-HDFS and RBW
2512 
VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace)2513     VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
2514       this.directory = v.toString();
2515       this.usedSpace = usedSpace;
2516       this.freeSpace = freeSpace;
2517       this.reservedSpace = v.getReserved();
2518     }
2519   }
2520 
getVolumeInfo()2521   private Collection<VolumeInfo> getVolumeInfo() {
2522     Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
2523     for (FsVolumeImpl volume : getVolumes()) {
2524       long used = 0;
2525       long free = 0;
2526       try (FsVolumeReference ref = volume.obtainReference()) {
2527         used = volume.getDfsUsed();
2528         free = volume.getAvailable();
2529       } catch (ClosedChannelException e) {
2530         continue;
2531       } catch (IOException e) {
2532         LOG.warn(e.getMessage());
2533         used = 0;
2534         free = 0;
2535       }
2536 
2537       info.add(new VolumeInfo(volume, used, free));
2538     }
2539     return info;
2540   }
2541 
2542   @Override
getVolumeInfoMap()2543   public Map<String, Object> getVolumeInfoMap() {
2544     final Map<String, Object> info = new HashMap<String, Object>();
2545     Collection<VolumeInfo> volumes = getVolumeInfo();
2546     for (VolumeInfo v : volumes) {
2547       final Map<String, Object> innerInfo = new HashMap<String, Object>();
2548       innerInfo.put("usedSpace", v.usedSpace);
2549       innerInfo.put("freeSpace", v.freeSpace);
2550       innerInfo.put("reservedSpace", v.reservedSpace);
2551       info.put(v.directory, innerInfo);
2552     }
2553     return info;
2554   }
2555 
2556   @Override //FsDatasetSpi
deleteBlockPool(String bpid, boolean force)2557   public synchronized void deleteBlockPool(String bpid, boolean force)
2558       throws IOException {
2559     List<FsVolumeImpl> curVolumes = getVolumes();
2560     if (!force) {
2561       for (FsVolumeImpl volume : curVolumes) {
2562         try (FsVolumeReference ref = volume.obtainReference()) {
2563           if (!volume.isBPDirEmpty(bpid)) {
2564             LOG.warn(bpid + " has some block files, cannot delete unless forced");
2565             throw new IOException("Cannot delete block pool, "
2566                 + "it contains some block files");
2567           }
2568         } catch (ClosedChannelException e) {
2569           // ignore.
2570         }
2571       }
2572     }
2573     for (FsVolumeImpl volume : curVolumes) {
2574       try (FsVolumeReference ref = volume.obtainReference()) {
2575         volume.deleteBPDirectories(bpid, force);
2576       } catch (ClosedChannelException e) {
2577         // ignore.
2578       }
2579     }
2580   }
2581 
2582   @Override // FsDatasetSpi
getBlockLocalPathInfo(ExtendedBlock block)2583   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
2584       throws IOException {
2585     synchronized(this) {
2586       final Replica replica = volumeMap.get(block.getBlockPoolId(),
2587           block.getBlockId());
2588       if (replica == null) {
2589         throw new ReplicaNotFoundException(block);
2590       }
2591       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
2592         throw new IOException(
2593             "Replica generation stamp < block generation stamp, block="
2594             + block + ", replica=" + replica);
2595       } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
2596         block.setGenerationStamp(replica.getGenerationStamp());
2597       }
2598     }
2599 
2600     File datafile = getBlockFile(block);
2601     File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
2602     BlockLocalPathInfo info = new BlockLocalPathInfo(block,
2603         datafile.getAbsolutePath(), metafile.getAbsolutePath());
2604     return info;
2605   }
2606 
2607   @Override // FsDatasetSpi
getHdfsBlocksMetadata(String poolId, long[] blockIds)2608   public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
2609       long[] blockIds) throws IOException {
2610     List<FsVolumeImpl> curVolumes = getVolumes();
2611     // List of VolumeIds, one per volume on the datanode
2612     List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
2613     // List of indexes into the list of VolumeIds, pointing at the VolumeId of
2614     // the volume that the block is on
2615     List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
2616     // Initialize the list of VolumeIds simply by enumerating the volumes
2617     for (int i = 0; i < curVolumes.size(); i++) {
2618       blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
2619     }
2620     // Determine the index of the VolumeId of each block's volume, by comparing
2621     // the block's volume against the enumerated volumes
2622     for (int i = 0; i < blockIds.length; i++) {
2623       long blockId = blockIds[i];
2624       boolean isValid = false;
2625 
2626       ReplicaInfo info = volumeMap.get(poolId, blockId);
2627       int volumeIndex = 0;
2628       if (info != null) {
2629         FsVolumeSpi blockVolume = info.getVolume();
2630         for (FsVolumeImpl volume : curVolumes) {
2631           // This comparison of references should be safe
2632           if (blockVolume == volume) {
2633             isValid = true;
2634             break;
2635           }
2636           volumeIndex++;
2637         }
2638       }
2639       // Indicates that the block is not present, or not found in a data dir
2640       if (!isValid) {
2641         volumeIndex = Integer.MAX_VALUE;
2642       }
2643       blocksVolumeIndexes.add(volumeIndex);
2644     }
2645     return new HdfsBlocksMetadata(poolId, blockIds,
2646         blocksVolumeIds, blocksVolumeIndexes);
2647   }
2648 
2649   @Override
enableTrash(String bpid)2650   public void enableTrash(String bpid) {
2651     dataStorage.enableTrash(bpid);
2652   }
2653 
2654   @Override
clearTrash(String bpid)2655   public void clearTrash(String bpid) {
2656     dataStorage.clearTrash(bpid);
2657   }
2658 
2659   @Override
trashEnabled(String bpid)2660   public boolean trashEnabled(String bpid) {
2661     return dataStorage.trashEnabled(bpid);
2662   }
2663 
2664   @Override
setRollingUpgradeMarker(String bpid)2665   public void setRollingUpgradeMarker(String bpid) throws IOException {
2666     dataStorage.setRollingUpgradeMarker(bpid);
2667   }
2668 
2669   @Override
clearRollingUpgradeMarker(String bpid)2670   public void clearRollingUpgradeMarker(String bpid) throws IOException {
2671     dataStorage.clearRollingUpgradeMarker(bpid);
2672   }
2673 
2674 
2675   @Override
onCompleteLazyPersist(String bpId, long blockId, long creationTime, File[] savedFiles, FsVolumeImpl targetVolume)2676   public void onCompleteLazyPersist(String bpId, long blockId,
2677       long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
2678     synchronized (FsDatasetImpl.this) {
2679       ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
2680 
2681       targetVolume.incDfsUsed(bpId,
2682           savedFiles[0].length() + savedFiles[1].length());
2683 
2684       // Update metrics (ignore the metadata file size)
2685       datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
2686       datanode.getMetrics().incrRamDiskBytesLazyPersisted(savedFiles[1].length());
2687       datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
2688           Time.monotonicNow() - creationTime);
2689 
2690       if (LOG.isDebugEnabled()) {
2691         LOG.debug("LazyWriter: Finish persisting RamDisk block: "
2692             + " block pool Id: " + bpId + " block id: " + blockId
2693             + " to block file " + savedFiles[1] + " and meta file " + savedFiles[0]
2694             + " on target volume " + targetVolume);
2695       }
2696     }
2697   }
2698 
2699   @Override
onFailLazyPersist(String bpId, long blockId)2700   public void onFailLazyPersist(String bpId, long blockId) {
2701     RamDiskReplica block = null;
2702     block = ramDiskReplicaTracker.getReplica(bpId, blockId);
2703     if (block != null) {
2704       LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
2705       ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
2706     }
2707   }
2708 
2709   @Override
submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags)2710   public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
2711       FileDescriptor fd, long offset, long nbytes, int flags) {
2712     FsVolumeImpl fsVolumeImpl = this.getVolume(block);
2713     asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
2714         nbytes, flags);
2715   }
2716 
ramDiskConfigured()2717   private boolean ramDiskConfigured() {
2718     for (FsVolumeImpl v: getVolumes()){
2719       if (v.isTransientStorage()) {
2720         return true;
2721       }
2722     }
2723     return false;
2724   }
2725 
2726   // Add/Remove per DISK volume async lazy persist thread when RamDisk volume is
2727   // added or removed.
2728   // This should only be called when the FsDataSetImpl#volumes list is finalized.
setupAsyncLazyPersistThreads()2729   private void setupAsyncLazyPersistThreads() {
2730     for (FsVolumeImpl v: getVolumes()){
2731       setupAsyncLazyPersistThread(v);
2732     }
2733   }
2734 
setupAsyncLazyPersistThread(final FsVolumeImpl v)2735   private void setupAsyncLazyPersistThread(final FsVolumeImpl v) {
2736     // Skip transient volumes
2737     if (v.isTransientStorage()) {
2738       return;
2739     }
2740     boolean ramDiskConfigured = ramDiskConfigured();
2741     // Add thread for DISK volume if RamDisk is configured
2742     if (ramDiskConfigured &&
2743         !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
2744       asyncLazyPersistService.addVolume(v.getCurrentDir());
2745     }
2746 
2747     // Remove thread for DISK volume if RamDisk is not configured
2748     if (!ramDiskConfigured &&
2749         asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
2750       asyncLazyPersistService.removeVolume(v.getCurrentDir());
2751     }
2752   }
2753 
removeOldReplica(ReplicaInfo replicaInfo, ReplicaInfo newReplicaInfo, File blockFile, File metaFile, long blockFileUsed, long metaFileUsed, final String bpid)2754   private void removeOldReplica(ReplicaInfo replicaInfo,
2755       ReplicaInfo newReplicaInfo, File blockFile, File metaFile,
2756       long blockFileUsed, long metaFileUsed, final String bpid) {
2757     // Before deleting the files from old storage we must notify the
2758     // NN that the files are on the new storage. Else a blockReport from
2759     // the transient storage might cause the NN to think the blocks are lost.
2760     // Replicas must be evicted from client short-circuit caches, because the
2761     // storage will no longer be same, and thus will require validating
2762     // checksum.  This also stops a client from holding file descriptors,
2763     // which would prevent the OS from reclaiming the memory.
2764     ExtendedBlock extendedBlock =
2765         new ExtendedBlock(bpid, newReplicaInfo);
2766     datanode.getShortCircuitRegistry().processBlockInvalidation(
2767         ExtendedBlockId.fromExtendedBlock(extendedBlock));
2768     datanode.notifyNamenodeReceivedBlock(
2769         extendedBlock, null, newReplicaInfo.getStorageUuid());
2770 
2771     // Remove the old replicas
2772     if (blockFile.delete() || !blockFile.exists()) {
2773       ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
2774       if (metaFile.delete() || !metaFile.exists()) {
2775         ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
2776       }
2777     }
2778 
2779     // If deletion failed then the directory scanner will cleanup the blocks
2780     // eventually.
2781   }
2782 
2783   class LazyWriter implements Runnable {
2784     private volatile boolean shouldRun = true;
2785     final int checkpointerInterval;
2786     final float lowWatermarkFreeSpacePercentage;
2787     final long lowWatermarkFreeSpaceBytes;
2788 
2789 
LazyWriter(Configuration conf)2790     public LazyWriter(Configuration conf) {
2791       this.checkpointerInterval = conf.getInt(
2792           DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
2793           DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
2794       this.lowWatermarkFreeSpacePercentage = conf.getFloat(
2795           DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
2796           DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
2797       this.lowWatermarkFreeSpaceBytes = conf.getLong(
2798           DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
2799           DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT);
2800     }
2801 
2802     /**
2803      * Checkpoint a pending replica to persistent storage now.
2804      * If we fail then move the replica to the end of the queue.
2805      * @return true if there is more work to be done, false otherwise.
2806      */
saveNextReplica()2807     private boolean saveNextReplica() {
2808       RamDiskReplica block = null;
2809       FsVolumeReference targetReference;
2810       FsVolumeImpl targetVolume;
2811       ReplicaInfo replicaInfo;
2812       boolean succeeded = false;
2813 
2814       try {
2815         block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
2816         if (block != null) {
2817           synchronized (FsDatasetImpl.this) {
2818             replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
2819 
2820             // If replicaInfo is null, the block was either deleted before
2821             // it could be checkpointed or it is already on persistent storage.
2822             // This can occur if a second replica on persistent storage was found
2823             // after the lazy write was scheduled.
2824             if (replicaInfo != null &&
2825                 replicaInfo.getVolume().isTransientStorage()) {
2826               // Pick a target volume to persist the block.
2827               targetReference = volumes.getNextVolume(
2828                   StorageType.DEFAULT, replicaInfo.getNumBytes());
2829               targetVolume = (FsVolumeImpl) targetReference.getVolume();
2830 
2831               ramDiskReplicaTracker.recordStartLazyPersist(
2832                   block.getBlockPoolId(), block.getBlockId(), targetVolume);
2833 
2834               if (LOG.isDebugEnabled()) {
2835                 LOG.debug("LazyWriter: Start persisting RamDisk block:"
2836                     + " block pool Id: " + block.getBlockPoolId()
2837                     + " block id: " + block.getBlockId()
2838                     + " on target volume " + targetVolume);
2839               }
2840 
2841               asyncLazyPersistService.submitLazyPersistTask(
2842                   block.getBlockPoolId(), block.getBlockId(),
2843                   replicaInfo.getGenerationStamp(), block.getCreationTime(),
2844                   replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
2845                   targetReference);
2846             }
2847           }
2848         }
2849         succeeded = true;
2850       } catch(IOException ioe) {
2851         LOG.warn("Exception saving replica " + block, ioe);
2852       } finally {
2853         if (!succeeded && block != null) {
2854           LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
2855           onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
2856         }
2857       }
2858       return succeeded;
2859     }
2860 
transientFreeSpaceBelowThreshold()2861     private boolean transientFreeSpaceBelowThreshold() throws IOException {
2862       long free = 0;
2863       long capacity = 0;
2864       float percentFree = 0.0f;
2865 
2866       // Don't worry about fragmentation for now. We don't expect more than one
2867       // transient volume per DN.
2868       for (FsVolumeImpl v : getVolumes()) {
2869         try (FsVolumeReference ref = v.obtainReference()) {
2870           if (v.isTransientStorage()) {
2871             capacity += v.getCapacity();
2872             free += v.getAvailable();
2873           }
2874         } catch (ClosedChannelException e) {
2875           // ignore.
2876         }
2877       }
2878 
2879       if (capacity == 0) {
2880         return false;
2881       }
2882 
2883       percentFree = (float) ((double)free * 100 / capacity);
2884       return (percentFree < lowWatermarkFreeSpacePercentage) ||
2885           (free < lowWatermarkFreeSpaceBytes);
2886     }
2887 
2888     /**
2889      * Attempt to evict one or more transient block replicas we have at least
2890      * spaceNeeded bytes free.
2891      */
evictBlocks()2892     private void evictBlocks() throws IOException {
2893       int iterations = 0;
2894 
2895       while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
2896              transientFreeSpaceBelowThreshold()) {
2897         RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
2898 
2899         if (replicaState == null) {
2900           break;
2901         }
2902 
2903         if (LOG.isDebugEnabled()) {
2904           LOG.debug("Evicting block " + replicaState);
2905         }
2906 
2907         ReplicaInfo replicaInfo, newReplicaInfo;
2908         File blockFile, metaFile;
2909         long blockFileUsed, metaFileUsed;
2910         final String bpid = replicaState.getBlockPoolId();
2911 
2912         synchronized (FsDatasetImpl.this) {
2913           replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
2914           Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
2915           blockFile = replicaInfo.getBlockFile();
2916           metaFile = replicaInfo.getMetaFile();
2917           blockFileUsed = blockFile.length();
2918           metaFileUsed = metaFile.length();
2919           ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
2920               replicaState.getBlockId(), false);
2921 
2922           // Move the replica from lazyPersist/ to finalized/ on target volume
2923           BlockPoolSlice bpSlice =
2924               replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
2925           File newBlockFile = bpSlice.activateSavedReplica(
2926               replicaInfo, replicaState.getSavedMetaFile(),
2927               replicaState.getSavedBlockFile());
2928 
2929           newReplicaInfo =
2930               new FinalizedReplica(replicaInfo.getBlockId(),
2931                                    replicaInfo.getBytesOnDisk(),
2932                                    replicaInfo.getGenerationStamp(),
2933                                    replicaState.getLazyPersistVolume(),
2934                                    newBlockFile.getParentFile());
2935 
2936           // Update the volumeMap entry.
2937           volumeMap.add(bpid, newReplicaInfo);
2938 
2939           // Update metrics
2940           datanode.getMetrics().incrRamDiskBlocksEvicted();
2941           datanode.getMetrics().addRamDiskBlocksEvictionWindowMs(
2942               Time.monotonicNow() - replicaState.getCreationTime());
2943           if (replicaState.getNumReads() == 0) {
2944             datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
2945           }
2946         }
2947 
2948         removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
2949             blockFileUsed, metaFileUsed, bpid);
2950       }
2951     }
2952 
2953     @Override
run()2954     public void run() {
2955       int numSuccessiveFailures = 0;
2956 
2957       while (fsRunning && shouldRun) {
2958         try {
2959           numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
2960           evictBlocks();
2961 
2962           // Sleep if we have no more work to do or if it looks like we are not
2963           // making any forward progress. This is to ensure that if all persist
2964           // operations are failing we don't keep retrying them in a tight loop.
2965           if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
2966             Thread.sleep(checkpointerInterval * 1000);
2967             numSuccessiveFailures = 0;
2968           }
2969         } catch (InterruptedException e) {
2970           LOG.info("LazyWriter was interrupted, exiting");
2971           break;
2972         } catch (Exception e) {
2973           LOG.warn("Ignoring exception in LazyWriter:", e);
2974         }
2975       }
2976     }
2977 
stop()2978     public void stop() {
2979       shouldRun = false;
2980     }
2981   }
2982 
2983   @Override
setPinning(ExtendedBlock block)2984   public void setPinning(ExtendedBlock block) throws IOException {
2985     if (!blockPinningEnabled) {
2986       return;
2987     }
2988 
2989     File f = getBlockFile(block);
2990     Path p = new Path(f.getAbsolutePath());
2991 
2992     FsPermission oldPermission = localFS.getFileStatus(
2993         new Path(f.getAbsolutePath())).getPermission();
2994     //sticky bit is used for pinning purpose
2995     FsPermission permission = new FsPermission(oldPermission.getUserAction(),
2996         oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
2997     localFS.setPermission(p, permission);
2998   }
2999 
3000   @Override
getPinning(ExtendedBlock block)3001   public boolean getPinning(ExtendedBlock block) throws IOException {
3002     if (!blockPinningEnabled) {
3003       return  false;
3004     }
3005     File f = getBlockFile(block);
3006 
3007     FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
3008     return fss.getPermission().getStickyBit();
3009   }
3010 
3011   @Override
isDeletingBlock(String bpid, long blockId)3012   public boolean isDeletingBlock(String bpid, long blockId) {
3013     synchronized(deletingBlock) {
3014       Set<Long> s = deletingBlock.get(bpid);
3015       return s != null ? s.contains(blockId) : false;
3016     }
3017   }
3018 
removeDeletedBlocks(String bpid, Set<Long> blockIds)3019   public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
3020     synchronized (deletingBlock) {
3021       Set<Long> s = deletingBlock.get(bpid);
3022       if (s != null) {
3023         for (Long id : blockIds) {
3024           s.remove(id);
3025         }
3026       }
3027     }
3028   }
3029 
addDeletingBlock(String bpid, Long blockId)3030   private void addDeletingBlock(String bpid, Long blockId) {
3031     synchronized(deletingBlock) {
3032       Set<Long> s = deletingBlock.get(bpid);
3033       if (s == null) {
3034         s = new HashSet<Long>();
3035         deletingBlock.put(bpid, s);
3036       }
3037       s.add(blockId);
3038     }
3039   }
3040 }
3041 
3042