1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
19 
20 import java.io.BufferedWriter;
21 import java.io.File;
22 import java.io.FileOutputStream;
23 import java.io.FilenameFilter;
24 import java.io.IOException;
25 import java.nio.channels.ClosedChannelException;
26 import java.io.OutputStreamWriter;
27 import java.nio.file.Files;
28 import java.nio.file.Paths;
29 import java.nio.file.StandardCopyOption;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.LinkedBlockingQueue;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicLong;
42 
43 import com.google.common.annotations.VisibleForTesting;
44 import com.google.common.base.Joiner;
45 import com.google.common.base.Preconditions;
46 import org.apache.hadoop.classification.InterfaceAudience;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.fs.DF;
49 import org.apache.hadoop.fs.FileUtil;
50 import org.apache.hadoop.fs.StorageType;
51 import org.apache.hadoop.hdfs.DFSConfigKeys;
52 import org.apache.hadoop.hdfs.protocol.Block;
53 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
54 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
55 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
56 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
57 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
58 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
59 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
60 import org.apache.hadoop.util.CloseableReferenceCount;
61 import org.apache.hadoop.io.IOUtils;
62 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
63 
64 import com.google.common.util.concurrent.ThreadFactoryBuilder;
65 import org.apache.hadoop.util.Time;
66 import org.codehaus.jackson.annotate.JsonProperty;
67 import org.codehaus.jackson.map.ObjectMapper;
68 import org.codehaus.jackson.map.ObjectReader;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71 
72 /**
73  * The underlying volume used to store replica.
74  *
75  * It uses the {@link FsDatasetImpl} object for synchronization.
76  */
77 @InterfaceAudience.Private
78 @VisibleForTesting
79 public class FsVolumeImpl implements FsVolumeSpi {
80   public static final Logger LOG =
81       LoggerFactory.getLogger(FsVolumeImpl.class);
82 
83   private final FsDatasetImpl dataset;
84   private final String storageID;
85   private final StorageType storageType;
86   private final Map<String, BlockPoolSlice> bpSlices
87       = new ConcurrentHashMap<String, BlockPoolSlice>();
88   private final File currentDir;    // <StorageDirectory>/current
89   private final DF usage;
90   private final long reserved;
91   private CloseableReferenceCount reference = new CloseableReferenceCount();
92 
93   // Disk space reserved for open blocks.
94   private AtomicLong reservedForRbw;
95 
96   // Capacity configured. This is useful when we want to
97   // limit the visible capacity for tests. If negative, then we just
98   // query from the filesystem.
99   protected volatile long configuredCapacity;
100 
101   /**
102    * Per-volume worker pool that processes new blocks to cache.
103    * The maximum number of workers per volume is bounded (configurable via
104    * dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
105    * contention.
106    */
107   protected ThreadPoolExecutor cacheExecutor;
108 
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, Configuration conf, StorageType storageType)109   FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
110       Configuration conf, StorageType storageType) throws IOException {
111     this.dataset = dataset;
112     this.storageID = storageID;
113     this.reserved = conf.getLong(
114         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
115         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
116     this.reservedForRbw = new AtomicLong(0L);
117     this.currentDir = currentDir;
118     File parent = currentDir.getParentFile();
119     this.usage = new DF(parent, conf);
120     this.storageType = storageType;
121     this.configuredCapacity = -1;
122     cacheExecutor = initializeCacheExecutor(parent);
123   }
124 
initializeCacheExecutor(File parent)125   protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
126     if (storageType.isTransient()) {
127       return null;
128     }
129     if (dataset.datanode == null) {
130       // FsVolumeImpl is used in test.
131       return null;
132     }
133 
134     final int maxNumThreads = dataset.datanode.getConf().getInt(
135         DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
136         DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT);
137 
138     ThreadFactory workerFactory = new ThreadFactoryBuilder()
139         .setDaemon(true)
140         .setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
141         .build();
142     ThreadPoolExecutor executor = new ThreadPoolExecutor(
143         1, maxNumThreads,
144         60, TimeUnit.SECONDS,
145         new LinkedBlockingQueue<Runnable>(),
146         workerFactory);
147     executor.allowCoreThreadTimeOut(true);
148     return executor;
149   }
150 
printReferenceTraceInfo(String op)151   private void printReferenceTraceInfo(String op) {
152     StackTraceElement[] stack = Thread.currentThread().getStackTrace();
153     for (StackTraceElement ste : stack) {
154       switch (ste.getMethodName()) {
155       case "getDfsUsed":
156       case "getBlockPoolUsed":
157       case "getAvailable":
158       case "getVolumeMap":
159         return;
160       default:
161         break;
162       }
163     }
164     FsDatasetImpl.LOG.trace("Reference count: " + op + " " + this + ": " +
165         this.reference.getReferenceCount());
166     FsDatasetImpl.LOG.trace(
167         Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
168   }
169 
170   /**
171    * Increase the reference count. The caller must increase the reference count
172    * before issuing IOs.
173    *
174    * @throws IOException if the volume is already closed.
175    */
reference()176   private void reference() throws ClosedChannelException {
177     this.reference.reference();
178     if (FsDatasetImpl.LOG.isTraceEnabled()) {
179       printReferenceTraceInfo("incr");
180     }
181   }
182 
183   /**
184    * Decrease the reference count.
185    */
unreference()186   private void unreference() {
187     if (FsDatasetImpl.LOG.isTraceEnabled()) {
188       printReferenceTraceInfo("desc");
189     }
190     if (FsDatasetImpl.LOG.isDebugEnabled()) {
191       if (reference.getReferenceCount() <= 0) {
192         FsDatasetImpl.LOG.debug("Decrease reference count <= 0 on " + this +
193           Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
194       }
195     }
196     checkReference();
197     this.reference.unreference();
198   }
199 
200   private static class FsVolumeReferenceImpl implements FsVolumeReference {
201     private final FsVolumeImpl volume;
202 
FsVolumeReferenceImpl(FsVolumeImpl volume)203     FsVolumeReferenceImpl(FsVolumeImpl volume) throws ClosedChannelException {
204       this.volume = volume;
205       volume.reference();
206     }
207 
208     /**
209      * Decreases the reference count.
210      * @throws IOException it never throws IOException.
211      */
212     @Override
close()213     public void close() throws IOException {
214       volume.unreference();
215     }
216 
217     @Override
getVolume()218     public FsVolumeSpi getVolume() {
219       return this.volume;
220     }
221   }
222 
223   @Override
obtainReference()224   public FsVolumeReference obtainReference() throws ClosedChannelException {
225     return new FsVolumeReferenceImpl(this);
226   }
227 
checkReference()228   private void checkReference() {
229     Preconditions.checkState(reference.getReferenceCount() > 0);
230   }
231 
232   /**
233    * Close this volume and wait all other threads to release the reference count
234    * on this volume.
235    * @throws IOException if the volume is closed or the waiting is interrupted.
236    */
closeAndWait()237   void closeAndWait() throws IOException {
238     try {
239       this.reference.setClosed();
240     } catch (ClosedChannelException e) {
241       throw new IOException("The volume has already closed.", e);
242     }
243     final int SLEEP_MILLIS = 500;
244     while (this.reference.getReferenceCount() > 0) {
245       if (FsDatasetImpl.LOG.isDebugEnabled()) {
246         FsDatasetImpl.LOG.debug(String.format(
247             "The reference count for %s is %d, wait to be 0.",
248             this, reference.getReferenceCount()));
249       }
250       try {
251         Thread.sleep(SLEEP_MILLIS);
252       } catch (InterruptedException e) {
253         throw new IOException(e);
254       }
255     }
256   }
257 
getCurrentDir()258   File getCurrentDir() {
259     return currentDir;
260   }
261 
getRbwDir(String bpid)262   File getRbwDir(String bpid) throws IOException {
263     return getBlockPoolSlice(bpid).getRbwDir();
264   }
265 
getLazyPersistDir(String bpid)266   File getLazyPersistDir(String bpid) throws IOException {
267     return getBlockPoolSlice(bpid).getLazypersistDir();
268   }
269 
getTmpDir(String bpid)270   File getTmpDir(String bpid) throws IOException {
271     return getBlockPoolSlice(bpid).getTmpDir();
272   }
273 
decDfsUsed(String bpid, long value)274   void decDfsUsed(String bpid, long value) {
275     synchronized(dataset) {
276       BlockPoolSlice bp = bpSlices.get(bpid);
277       if (bp != null) {
278         bp.decDfsUsed(value);
279       }
280     }
281   }
282 
incDfsUsed(String bpid, long value)283   void incDfsUsed(String bpid, long value) {
284     synchronized(dataset) {
285       BlockPoolSlice bp = bpSlices.get(bpid);
286       if (bp != null) {
287         bp.incDfsUsed(value);
288       }
289     }
290   }
291 
292   @VisibleForTesting
getDfsUsed()293   public long getDfsUsed() throws IOException {
294     long dfsUsed = 0;
295     synchronized(dataset) {
296       for(BlockPoolSlice s : bpSlices.values()) {
297         dfsUsed += s.getDfsUsed();
298       }
299     }
300     return dfsUsed;
301   }
302 
getBlockPoolUsed(String bpid)303   long getBlockPoolUsed(String bpid) throws IOException {
304     return getBlockPoolSlice(bpid).getDfsUsed();
305   }
306 
307   /**
308    * Return either the configured capacity of the file system if configured; or
309    * the capacity of the file system excluding space reserved for non-HDFS.
310    *
311    * @return the unreserved number of bytes left in this filesystem. May be
312    *         zero.
313    */
314   @VisibleForTesting
getCapacity()315   public long getCapacity() {
316     if (configuredCapacity < 0) {
317       long remaining = usage.getCapacity() - reserved;
318       return remaining > 0 ? remaining : 0;
319     }
320 
321     return configuredCapacity;
322   }
323 
324   /**
325    * This function MUST NOT be used outside of tests.
326    *
327    * @param capacity
328    */
329   @VisibleForTesting
setCapacityForTesting(long capacity)330   public void setCapacityForTesting(long capacity) {
331     this.configuredCapacity = capacity;
332   }
333 
334   /*
335    * Calculate the available space of the filesystem, excluding space reserved
336    * for non-HDFS and space reserved for RBW
337    *
338    * @return the available number of bytes left in this filesystem. May be zero.
339    */
340   @Override
getAvailable()341   public long getAvailable() throws IOException {
342     long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get();
343     long available = usage.getAvailable() - reserved - reservedForRbw.get();
344     if (remaining > available) {
345       remaining = available;
346     }
347     return (remaining > 0) ? remaining : 0;
348   }
349 
350   @VisibleForTesting
getReservedForRbw()351   public long getReservedForRbw() {
352     return reservedForRbw.get();
353   }
354 
getReserved()355   long getReserved(){
356     return reserved;
357   }
358 
getBlockPoolSlice(String bpid)359   BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
360     BlockPoolSlice bp = bpSlices.get(bpid);
361     if (bp == null) {
362       throw new IOException("block pool " + bpid + " is not found");
363     }
364     return bp;
365   }
366 
367   @Override
getBasePath()368   public String getBasePath() {
369     return currentDir.getParent();
370   }
371 
372   @Override
isTransientStorage()373   public boolean isTransientStorage() {
374     return storageType.isTransient();
375   }
376 
377   @Override
getPath(String bpid)378   public String getPath(String bpid) throws IOException {
379     return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
380   }
381 
382   @Override
getFinalizedDir(String bpid)383   public File getFinalizedDir(String bpid) throws IOException {
384     return getBlockPoolSlice(bpid).getFinalizedDir();
385   }
386 
387   /**
388    * Make a deep copy of the list of currently active BPIDs
389    */
390   @Override
getBlockPoolList()391   public String[] getBlockPoolList() {
392     return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
393   }
394 
395   /**
396    * Temporary files. They get moved to the finalized block directory when
397    * the block is finalized.
398    */
createTmpFile(String bpid, Block b)399   File createTmpFile(String bpid, Block b) throws IOException {
400     checkReference();
401     return getBlockPoolSlice(bpid).createTmpFile(b);
402   }
403 
404   @Override
reserveSpaceForRbw(long bytesToReserve)405   public void reserveSpaceForRbw(long bytesToReserve) {
406     if (bytesToReserve != 0) {
407       reservedForRbw.addAndGet(bytesToReserve);
408     }
409   }
410 
411   @Override
releaseReservedSpace(long bytesToRelease)412   public void releaseReservedSpace(long bytesToRelease) {
413     if (bytesToRelease != 0) {
414 
415       long oldReservation, newReservation;
416       do {
417         oldReservation = reservedForRbw.get();
418         newReservation = oldReservation - bytesToRelease;
419         if (newReservation < 0) {
420           // Failsafe, this should never occur in practice, but if it does we don't
421           // want to start advertising more space than we have available.
422           newReservation = 0;
423         }
424       } while (!reservedForRbw.compareAndSet(oldReservation, newReservation));
425     }
426   }
427 
428   private enum SubdirFilter implements FilenameFilter {
429     INSTANCE;
430 
431     @Override
accept(File dir, String name)432     public boolean accept(File dir, String name) {
433       return name.startsWith("subdir");
434     }
435   }
436 
437   private enum BlockFileFilter implements FilenameFilter {
438     INSTANCE;
439 
440     @Override
accept(File dir, String name)441     public boolean accept(File dir, String name) {
442       return !name.endsWith(".meta") &&
443               name.startsWith(Block.BLOCK_FILE_PREFIX);
444     }
445   }
446 
447   @VisibleForTesting
nextSorted(List<String> arr, String prev)448   public static String nextSorted(List<String> arr, String prev) {
449     int res = 0;
450     if (prev != null) {
451       res = Collections.binarySearch(arr, prev);
452       if (res < 0) {
453         res = -1 - res;
454       } else {
455         res++;
456       }
457     }
458     if (res >= arr.size()) {
459       return null;
460     }
461     return arr.get(res);
462   }
463 
464   private static class BlockIteratorState {
BlockIteratorState()465     BlockIteratorState() {
466       lastSavedMs = iterStartMs = Time.now();
467       curFinalizedDir = null;
468       curFinalizedSubDir = null;
469       curEntry = null;
470       atEnd = false;
471     }
472 
473     // The wall-clock ms since the epoch at which this iterator was last saved.
474     @JsonProperty
475     private long lastSavedMs;
476 
477     // The wall-clock ms since the epoch at which this iterator was created.
478     @JsonProperty
479     private long iterStartMs;
480 
481     @JsonProperty
482     private String curFinalizedDir;
483 
484     @JsonProperty
485     private String curFinalizedSubDir;
486 
487     @JsonProperty
488     private String curEntry;
489 
490     @JsonProperty
491     private boolean atEnd;
492   }
493 
494   /**
495    * A BlockIterator implementation for FsVolumeImpl.
496    */
497   private class BlockIteratorImpl implements FsVolumeSpi.BlockIterator {
498     private final File bpidDir;
499     private final String name;
500     private final String bpid;
501     private long maxStalenessMs = 0;
502 
503     private List<String> cache;
504     private long cacheMs;
505 
506     private BlockIteratorState state;
507 
BlockIteratorImpl(String bpid, String name)508     BlockIteratorImpl(String bpid, String name) {
509       this.bpidDir = new File(currentDir, bpid);
510       this.name = name;
511       this.bpid = bpid;
512       rewind();
513     }
514 
515     /**
516      * Get the next subdirectory within the block pool slice.
517      *
518      * @return         The next subdirectory within the block pool slice, or
519      *                   null if there are no more.
520      */
getNextSubDir(String prev, File dir)521     private String getNextSubDir(String prev, File dir)
522           throws IOException {
523       List<String> children =
524           IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
525       cache = null;
526       cacheMs = 0;
527       if (children.size() == 0) {
528         LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}",
529             storageID, bpid, dir.getAbsolutePath());
530         return null;
531       }
532       Collections.sort(children);
533       String nextSubDir = nextSorted(children, prev);
534       if (nextSubDir == null) {
535         LOG.trace("getNextSubDir({}, {}): no more subdirectories found in {}",
536             storageID, bpid, dir.getAbsolutePath());
537       } else {
538         LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} " +
539             "within {}", storageID, bpid, nextSubDir, dir.getAbsolutePath());
540       }
541       return nextSubDir;
542     }
543 
getNextFinalizedDir()544     private String getNextFinalizedDir() throws IOException {
545       File dir = Paths.get(
546           bpidDir.getAbsolutePath(), "current", "finalized").toFile();
547       return getNextSubDir(state.curFinalizedDir, dir);
548     }
549 
getNextFinalizedSubDir()550     private String getNextFinalizedSubDir() throws IOException {
551       if (state.curFinalizedDir == null) {
552         return null;
553       }
554       File dir = Paths.get(
555           bpidDir.getAbsolutePath(), "current", "finalized",
556               state.curFinalizedDir).toFile();
557       return getNextSubDir(state.curFinalizedSubDir, dir);
558     }
559 
getSubdirEntries()560     private List<String> getSubdirEntries() throws IOException {
561       if (state.curFinalizedSubDir == null) {
562         return null; // There are no entries in the null subdir.
563       }
564       long now = Time.monotonicNow();
565       if (cache != null) {
566         long delta = now - cacheMs;
567         if (delta < maxStalenessMs) {
568           return cache;
569         } else {
570           LOG.trace("getSubdirEntries({}, {}): purging entries cache for {} " +
571             "after {} ms.", storageID, bpid, state.curFinalizedSubDir, delta);
572           cache = null;
573         }
574       }
575       File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
576                     state.curFinalizedDir, state.curFinalizedSubDir).toFile();
577       List<String> entries =
578           IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE);
579       if (entries.size() == 0) {
580         entries = null;
581       } else {
582         Collections.sort(entries);
583       }
584       if (entries == null) {
585         LOG.trace("getSubdirEntries({}, {}): no entries found in {}",
586             storageID, bpid, dir.getAbsolutePath());
587       } else {
588         LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}",
589             storageID, bpid, entries.size(), dir.getAbsolutePath());
590       }
591       cache = entries;
592       cacheMs = now;
593       return cache;
594     }
595 
596     /**
597      * Get the next block.<p/>
598      *
599      * Each volume has a hierarchical structure.<p/>
600      *
601      * <code>
602      * BPID B0
603      *   finalized/
604      *     subdir0
605      *       subdir0
606      *         blk_000
607      *         blk_001
608      *       ...
609      *     subdir1
610      *       subdir0
611      *         ...
612      *   rbw/
613      * </code>
614      *
615      * When we run out of entries at one level of the structure, we search
616      * progressively higher levels.  For example, when we run out of blk_
617      * entries in a subdirectory, we search for the next subdirectory.
618      * And so on.
619      */
620     @Override
nextBlock()621     public ExtendedBlock nextBlock() throws IOException {
622       if (state.atEnd) {
623         return null;
624       }
625       try {
626         while (true) {
627           List<String> entries = getSubdirEntries();
628           if (entries != null) {
629             state.curEntry = nextSorted(entries, state.curEntry);
630             if (state.curEntry == null) {
631               LOG.trace("nextBlock({}, {}): advancing from {} to next " +
632                   "subdirectory.", storageID, bpid, state.curFinalizedSubDir);
633             } else {
634               ExtendedBlock block =
635                   new ExtendedBlock(bpid, Block.filename2id(state.curEntry));
636               LOG.trace("nextBlock({}, {}): advancing to {}",
637                   storageID, bpid, block);
638               return block;
639             }
640           }
641           state.curFinalizedSubDir = getNextFinalizedSubDir();
642           if (state.curFinalizedSubDir == null) {
643             state.curFinalizedDir = getNextFinalizedDir();
644             if (state.curFinalizedDir == null) {
645               state.atEnd = true;
646               return null;
647             }
648           }
649         }
650       } catch (IOException e) {
651         state.atEnd = true;
652         LOG.error("nextBlock({}, {}): I/O error", storageID, bpid, e);
653         throw e;
654       }
655     }
656 
657     @Override
atEnd()658     public boolean atEnd() {
659       return state.atEnd;
660     }
661 
662     @Override
rewind()663     public void rewind() {
664       cache = null;
665       cacheMs = 0;
666       state = new BlockIteratorState();
667     }
668 
669     @Override
save()670     public void save() throws IOException {
671       state.lastSavedMs = Time.now();
672       boolean success = false;
673       ObjectMapper mapper = new ObjectMapper();
674       try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
675                 new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) {
676         mapper.writerWithDefaultPrettyPrinter().writeValue(writer, state);
677         success = true;
678       } finally {
679         if (!success) {
680           if (getTempSaveFile().delete()) {
681             LOG.debug("save({}, {}): error deleting temporary file.",
682                 storageID, bpid);
683           }
684         }
685       }
686       Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(),
687           StandardCopyOption.ATOMIC_MOVE);
688       if (LOG.isTraceEnabled()) {
689         LOG.trace("save({}, {}): saved {}", storageID, bpid,
690             mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
691       }
692     }
693 
load()694     public void load() throws IOException {
695       ObjectMapper mapper = new ObjectMapper();
696       File file = getSaveFile();
697       this.state = mapper.reader(BlockIteratorState.class).readValue(file);
698       LOG.trace("load({}, {}): loaded iterator {} from {}: {}", storageID,
699           bpid, name, file.getAbsoluteFile(),
700           mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
701     }
702 
getSaveFile()703     File getSaveFile() {
704       return new File(bpidDir, name + ".cursor");
705     }
706 
getTempSaveFile()707     File getTempSaveFile() {
708       return new File(bpidDir, name + ".cursor.tmp");
709     }
710 
711     @Override
setMaxStalenessMs(long maxStalenessMs)712     public void setMaxStalenessMs(long maxStalenessMs) {
713       this.maxStalenessMs = maxStalenessMs;
714     }
715 
716     @Override
close()717     public void close() throws IOException {
718       // No action needed for this volume implementation.
719     }
720 
721     @Override
getIterStartMs()722     public long getIterStartMs() {
723       return state.iterStartMs;
724     }
725 
726     @Override
getLastSavedMs()727     public long getLastSavedMs() {
728       return state.lastSavedMs;
729     }
730 
731     @Override
getBlockPoolId()732     public String getBlockPoolId() {
733       return bpid;
734     }
735   }
736 
737   @Override
newBlockIterator(String bpid, String name)738   public BlockIterator newBlockIterator(String bpid, String name) {
739     return new BlockIteratorImpl(bpid, name);
740   }
741 
742   @Override
loadBlockIterator(String bpid, String name)743   public BlockIterator loadBlockIterator(String bpid, String name)
744       throws IOException {
745     BlockIteratorImpl iter = new BlockIteratorImpl(bpid, name);
746     iter.load();
747     return iter;
748   }
749 
750   @Override
getDataset()751   public FsDatasetSpi getDataset() {
752     return dataset;
753   }
754 
755   /**
756    * RBW files. They get moved to the finalized block directory when
757    * the block is finalized.
758    */
createRbwFile(String bpid, Block b)759   File createRbwFile(String bpid, Block b) throws IOException {
760     checkReference();
761     reserveSpaceForRbw(b.getNumBytes());
762     try {
763       return getBlockPoolSlice(bpid).createRbwFile(b);
764     } catch (IOException exception) {
765       releaseReservedSpace(b.getNumBytes());
766       throw exception;
767     }
768   }
769 
770   /**
771    *
772    * @param bytesReservedForRbw Space that was reserved during
773    *     block creation. Now that the block is being finalized we
774    *     can free up this space.
775    * @return
776    * @throws IOException
777    */
addFinalizedBlock(String bpid, Block b, File f, long bytesReservedForRbw)778   File addFinalizedBlock(String bpid, Block b,
779                          File f, long bytesReservedForRbw)
780       throws IOException {
781     releaseReservedSpace(bytesReservedForRbw);
782     return getBlockPoolSlice(bpid).addBlock(b, f);
783   }
784 
getCacheExecutor()785   Executor getCacheExecutor() {
786     return cacheExecutor;
787   }
788 
checkDirs()789   void checkDirs() throws DiskErrorException {
790     // TODO:FEDERATION valid synchronization
791     for(BlockPoolSlice s : bpSlices.values()) {
792       s.checkDirs();
793     }
794   }
795 
getVolumeMap(ReplicaMap volumeMap, final RamDiskReplicaTracker ramDiskReplicaMap)796   void getVolumeMap(ReplicaMap volumeMap,
797                     final RamDiskReplicaTracker ramDiskReplicaMap)
798       throws IOException {
799     for(BlockPoolSlice s : bpSlices.values()) {
800       s.getVolumeMap(volumeMap, ramDiskReplicaMap);
801     }
802   }
803 
getVolumeMap(String bpid, ReplicaMap volumeMap, final RamDiskReplicaTracker ramDiskReplicaMap)804   void getVolumeMap(String bpid, ReplicaMap volumeMap,
805                     final RamDiskReplicaTracker ramDiskReplicaMap)
806       throws IOException {
807     getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
808   }
809 
810   @Override
toString()811   public String toString() {
812     return currentDir.getAbsolutePath();
813   }
814 
shutdown()815   void shutdown() {
816     if (cacheExecutor != null) {
817       cacheExecutor.shutdown();
818     }
819     Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
820     for (Entry<String, BlockPoolSlice> entry : set) {
821       entry.getValue().shutdown();
822     }
823   }
824 
addBlockPool(String bpid, Configuration conf)825   void addBlockPool(String bpid, Configuration conf) throws IOException {
826     File bpdir = new File(currentDir, bpid);
827     BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
828     bpSlices.put(bpid, bp);
829   }
830 
shutdownBlockPool(String bpid)831   void shutdownBlockPool(String bpid) {
832     BlockPoolSlice bp = bpSlices.get(bpid);
833     if (bp != null) {
834       bp.shutdown();
835     }
836     bpSlices.remove(bpid);
837   }
838 
isBPDirEmpty(String bpid)839   boolean isBPDirEmpty(String bpid) throws IOException {
840     File volumeCurrentDir = this.getCurrentDir();
841     File bpDir = new File(volumeCurrentDir, bpid);
842     File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
843     File finalizedDir = new File(bpCurrentDir,
844         DataStorage.STORAGE_DIR_FINALIZED);
845     File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
846     if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
847         finalizedDir)) {
848       return false;
849     }
850     if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
851       return false;
852     }
853     return true;
854   }
855 
deleteBPDirectories(String bpid, boolean force)856   void deleteBPDirectories(String bpid, boolean force) throws IOException {
857     File volumeCurrentDir = this.getCurrentDir();
858     File bpDir = new File(volumeCurrentDir, bpid);
859     if (!bpDir.isDirectory()) {
860       // nothing to be deleted
861       return;
862     }
863     File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
864     File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
865     File finalizedDir = new File(bpCurrentDir,
866         DataStorage.STORAGE_DIR_FINALIZED);
867     File lazypersistDir = new File(bpCurrentDir,
868         DataStorage.STORAGE_DIR_LAZY_PERSIST);
869     File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
870     if (force) {
871       FileUtil.fullyDelete(bpDir);
872     } else {
873       if (!rbwDir.delete()) {
874         throw new IOException("Failed to delete " + rbwDir);
875       }
876       if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
877           !FileUtil.fullyDelete(finalizedDir)) {
878         throw new IOException("Failed to delete " + finalizedDir);
879       }
880       if (lazypersistDir.exists() &&
881         ((!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
882           !FileUtil.fullyDelete(lazypersistDir)))) {
883         throw new IOException("Failed to delete " + lazypersistDir);
884       }
885       FileUtil.fullyDelete(tmpDir);
886       for (File f : FileUtil.listFiles(bpCurrentDir)) {
887         if (!f.delete()) {
888           throw new IOException("Failed to delete " + f);
889         }
890       }
891       if (!bpCurrentDir.delete()) {
892         throw new IOException("Failed to delete " + bpCurrentDir);
893       }
894       for (File f : FileUtil.listFiles(bpDir)) {
895         if (!f.delete()) {
896           throw new IOException("Failed to delete " + f);
897         }
898       }
899       if (!bpDir.delete()) {
900         throw new IOException("Failed to delete " + bpDir);
901       }
902     }
903   }
904 
905   @Override
getStorageID()906   public String getStorageID() {
907     return storageID;
908   }
909 
910   @Override
getStorageType()911   public StorageType getStorageType() {
912     return storageType;
913   }
914 
toDatanodeStorage()915   DatanodeStorage toDatanodeStorage() {
916     return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
917   }
918 }
919 
920