1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
19 
20 import java.io.File;
21 import java.io.IOException;
22 import java.nio.channels.ClosedChannelException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.TreeMap;
32 import java.util.Set;
33 import java.util.concurrent.atomic.AtomicReference;
34 
35 import com.google.common.collect.Lists;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.StorageType;
38 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
39 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
40 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
41 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
42 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
43 import org.apache.hadoop.io.IOUtils;
44 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
45 import org.apache.hadoop.util.Time;
46 
47 class FsVolumeList {
48   private final AtomicReference<FsVolumeImpl[]> volumes =
49       new AtomicReference<>(new FsVolumeImpl[0]);
50   // Tracks volume failures, sorted by volume path.
51   private final Map<String, VolumeFailureInfo> volumeFailureInfos =
52       Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
53   private Object checkDirsMutex = new Object();
54 
55   private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
56   private final BlockScanner blockScanner;
57 
FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos, BlockScanner blockScanner, VolumeChoosingPolicy<FsVolumeImpl> blockChooser)58   FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos,
59       BlockScanner blockScanner,
60       VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
61     this.blockChooser = blockChooser;
62     this.blockScanner = blockScanner;
63     for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) {
64       volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
65           volumeFailureInfo);
66     }
67   }
68 
69   /**
70    * Return an immutable list view of all the volumes.
71    */
getVolumes()72   List<FsVolumeImpl> getVolumes() {
73     return Collections.unmodifiableList(Arrays.asList(volumes.get()));
74   }
75 
chooseVolume(List<FsVolumeImpl> list, long blockSize)76   private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
77       throws IOException {
78     while (true) {
79       FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize);
80       try {
81         return volume.obtainReference();
82       } catch (ClosedChannelException e) {
83         FsDatasetImpl.LOG.warn("Chosen a closed volume: " + volume);
84         // blockChooser.chooseVolume returns DiskOutOfSpaceException when the list
85         // is empty, indicating that all volumes are closed.
86         list.remove(volume);
87       }
88     }
89   }
90 
91   /**
92    * Get next volume.
93    *
94    * @param blockSize free space needed on the volume
95    * @param storageType the desired {@link StorageType}
96    * @return next volume to store the block in.
97    */
getNextVolume(StorageType storageType, long blockSize)98   FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
99       throws IOException {
100     // Get a snapshot of currently available volumes.
101     final FsVolumeImpl[] curVolumes = volumes.get();
102     final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.length);
103     for(FsVolumeImpl v : curVolumes) {
104       if (v.getStorageType() == storageType) {
105         list.add(v);
106       }
107     }
108     return chooseVolume(list, blockSize);
109   }
110 
111   /**
112    * Get next volume.
113    *
114    * @param blockSize free space needed on the volume
115    * @return next volume to store the block in.
116    */
getNextTransientVolume(long blockSize)117   FsVolumeReference getNextTransientVolume(long blockSize) throws IOException {
118     // Get a snapshot of currently available volumes.
119     final List<FsVolumeImpl> curVolumes = getVolumes();
120     final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.size());
121     for(FsVolumeImpl v : curVolumes) {
122       if (v.isTransientStorage()) {
123         list.add(v);
124       }
125     }
126     return chooseVolume(list, blockSize);
127   }
128 
getDfsUsed()129   long getDfsUsed() throws IOException {
130     long dfsUsed = 0L;
131     for (FsVolumeImpl v : volumes.get()) {
132       try(FsVolumeReference ref = v.obtainReference()) {
133         dfsUsed += v.getDfsUsed();
134       } catch (ClosedChannelException e) {
135         // ignore.
136       }
137     }
138     return dfsUsed;
139   }
140 
getBlockPoolUsed(String bpid)141   long getBlockPoolUsed(String bpid) throws IOException {
142     long dfsUsed = 0L;
143     for (FsVolumeImpl v : volumes.get()) {
144       try (FsVolumeReference ref = v.obtainReference()) {
145         dfsUsed += v.getBlockPoolUsed(bpid);
146       } catch (ClosedChannelException e) {
147         // ignore.
148       }
149     }
150     return dfsUsed;
151   }
152 
getCapacity()153   long getCapacity() {
154     long capacity = 0L;
155     for (FsVolumeImpl v : volumes.get()) {
156       try (FsVolumeReference ref = v.obtainReference()) {
157         capacity += v.getCapacity();
158       } catch (IOException e) {
159         // ignore.
160       }
161     }
162     return capacity;
163   }
164 
getRemaining()165   long getRemaining() throws IOException {
166     long remaining = 0L;
167     for (FsVolumeSpi vol : volumes.get()) {
168       try (FsVolumeReference ref = vol.obtainReference()) {
169         remaining += vol.getAvailable();
170       } catch (ClosedChannelException e) {
171         // ignore
172       }
173     }
174     return remaining;
175   }
176 
getAllVolumesMap(final String bpid, final ReplicaMap volumeMap, final RamDiskReplicaTracker ramDiskReplicaMap)177   void getAllVolumesMap(final String bpid,
178                         final ReplicaMap volumeMap,
179                         final RamDiskReplicaTracker ramDiskReplicaMap)
180       throws IOException {
181     long totalStartTime = Time.monotonicNow();
182     final List<IOException> exceptions = Collections.synchronizedList(
183         new ArrayList<IOException>());
184     List<Thread> replicaAddingThreads = new ArrayList<Thread>();
185     for (final FsVolumeImpl v : volumes.get()) {
186       Thread t = new Thread() {
187         public void run() {
188           try (FsVolumeReference ref = v.obtainReference()) {
189             FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
190                 bpid + " on volume " + v + "...");
191             long startTime = Time.monotonicNow();
192             v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
193             long timeTaken = Time.monotonicNow() - startTime;
194             FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
195                 + " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
196           } catch (ClosedChannelException e) {
197             FsDatasetImpl.LOG.info("The volume " + v + " is closed while " +
198                 "addng replicas, ignored.");
199           } catch (IOException ioe) {
200             FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
201                 "from " + v + ". Will throw later.", ioe);
202             exceptions.add(ioe);
203           }
204         }
205       };
206       replicaAddingThreads.add(t);
207       t.start();
208     }
209     for (Thread t : replicaAddingThreads) {
210       try {
211         t.join();
212       } catch (InterruptedException ie) {
213         throw new IOException(ie);
214       }
215     }
216     if (!exceptions.isEmpty()) {
217       throw exceptions.get(0);
218     }
219     long totalTimeTaken = Time.monotonicNow() - totalStartTime;
220     FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
221         + totalTimeTaken + "ms");
222   }
223 
224   /**
225    * Calls {@link FsVolumeImpl#checkDirs()} on each volume.
226    *
227    * Use checkDirsMutext to allow only one instance of checkDirs() call
228    *
229    * @return list of all the failed volumes.
230    */
checkDirs()231   Set<File> checkDirs() {
232     synchronized(checkDirsMutex) {
233       Set<File> failedVols = null;
234 
235       // Make a copy of volumes for performing modification
236       final List<FsVolumeImpl> volumeList = getVolumes();
237 
238       for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
239         final FsVolumeImpl fsv = i.next();
240         try (FsVolumeReference ref = fsv.obtainReference()) {
241           fsv.checkDirs();
242         } catch (DiskErrorException e) {
243           FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
244           if (failedVols == null) {
245             failedVols = new HashSet<>(1);
246           }
247           failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
248           addVolumeFailureInfo(fsv);
249           removeVolume(fsv);
250         } catch (ClosedChannelException e) {
251           FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
252             "reference count on closed volume", e);
253         } catch (IOException e) {
254           FsDatasetImpl.LOG.error("Unexpected IOException", e);
255         }
256       }
257 
258       if (failedVols != null && failedVols.size() > 0) {
259         FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size()
260             + " failure volumes.");
261       }
262 
263       return failedVols;
264     }
265   }
266 
267   @Override
toString()268   public String toString() {
269     return Arrays.toString(volumes.get());
270   }
271 
272   /**
273    * Dynamically add new volumes to the existing volumes that this DN manages.
274    *
275    * @param ref       a reference to the new FsVolumeImpl instance.
276    */
addVolume(FsVolumeReference ref)277   void addVolume(FsVolumeReference ref) {
278     while (true) {
279       final FsVolumeImpl[] curVolumes = volumes.get();
280       final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
281       volumeList.add((FsVolumeImpl)ref.getVolume());
282       if (volumes.compareAndSet(curVolumes,
283           volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
284         break;
285       } else {
286         if (FsDatasetImpl.LOG.isDebugEnabled()) {
287           FsDatasetImpl.LOG.debug(
288               "The volume list has been changed concurrently, " +
289                   "retry to remove volume: " + ref.getVolume().getStorageID());
290         }
291       }
292     }
293     if (blockScanner != null) {
294       blockScanner.addVolumeScanner(ref);
295     } else {
296       // If the volume is not put into a volume scanner, it does not need to
297       // hold the reference.
298       IOUtils.cleanup(FsDatasetImpl.LOG, ref);
299     }
300     // If the volume is used to replace a failed volume, it needs to reset the
301     // volume failure info for this volume.
302     removeVolumeFailureInfo(new File(ref.getVolume().getBasePath()));
303     FsDatasetImpl.LOG.info("Added new volume: " +
304         ref.getVolume().getStorageID());
305   }
306 
307   /**
308    * Dynamically remove a volume in the list.
309    * @param target the volume instance to be removed.
310    */
removeVolume(FsVolumeImpl target)311   private void removeVolume(FsVolumeImpl target) {
312     while (true) {
313       final FsVolumeImpl[] curVolumes = volumes.get();
314       final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
315       if (volumeList.remove(target)) {
316         if (volumes.compareAndSet(curVolumes,
317             volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
318           if (blockScanner != null) {
319             blockScanner.removeVolumeScanner(target);
320           }
321           try {
322             target.closeAndWait();
323           } catch (IOException e) {
324             FsDatasetImpl.LOG.warn(
325                 "Error occurs when waiting volume to close: " + target, e);
326           }
327           target.shutdown();
328           FsDatasetImpl.LOG.info("Removed volume: " + target);
329           break;
330         } else {
331           if (FsDatasetImpl.LOG.isDebugEnabled()) {
332             FsDatasetImpl.LOG.debug(
333                 "The volume list has been changed concurrently, " +
334                 "retry to remove volume: " + target);
335           }
336         }
337       } else {
338         if (FsDatasetImpl.LOG.isDebugEnabled()) {
339           FsDatasetImpl.LOG.debug("Volume " + target +
340               " does not exist or is removed by others.");
341         }
342         break;
343       }
344     }
345   }
346 
347   /**
348    * Dynamically remove volume in the list.
349    * @param volume the volume to be removed.
350    * @param clearFailure set true to remove failure info for this volume.
351    */
removeVolume(File volume, boolean clearFailure)352   void removeVolume(File volume, boolean clearFailure) {
353     // Make a copy of volumes to remove one volume.
354     final FsVolumeImpl[] curVolumes = volumes.get();
355     final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
356     for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
357       FsVolumeImpl fsVolume = it.next();
358       String basePath, targetPath;
359       basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
360       targetPath = volume.getAbsolutePath();
361       if (basePath.equals(targetPath)) {
362         // Make sure the removed volume is the one in the curVolumes.
363         removeVolume(fsVolume);
364       }
365     }
366     if (clearFailure) {
367       removeVolumeFailureInfo(volume);
368     }
369   }
370 
getVolumeFailureInfos()371   VolumeFailureInfo[] getVolumeFailureInfos() {
372     Collection<VolumeFailureInfo> infos = volumeFailureInfos.values();
373     return infos.toArray(new VolumeFailureInfo[infos.size()]);
374   }
375 
addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo)376   void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) {
377     volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
378         volumeFailureInfo);
379   }
380 
addVolumeFailureInfo(FsVolumeImpl vol)381   private void addVolumeFailureInfo(FsVolumeImpl vol) {
382     addVolumeFailureInfo(new VolumeFailureInfo(
383         new File(vol.getBasePath()).getAbsolutePath(),
384         Time.now(),
385         vol.getCapacity()));
386   }
387 
removeVolumeFailureInfo(File vol)388   private void removeVolumeFailureInfo(File vol) {
389     volumeFailureInfos.remove(vol.getAbsolutePath());
390   }
391 
addBlockPool(final String bpid, final Configuration conf)392   void addBlockPool(final String bpid, final Configuration conf) throws IOException {
393     long totalStartTime = Time.monotonicNow();
394 
395     final List<IOException> exceptions = Collections.synchronizedList(
396         new ArrayList<IOException>());
397     List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
398     for (final FsVolumeImpl v : volumes.get()) {
399       Thread t = new Thread() {
400         public void run() {
401           try (FsVolumeReference ref = v.obtainReference()) {
402             FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
403                 " on volume " + v + "...");
404             long startTime = Time.monotonicNow();
405             v.addBlockPool(bpid, conf);
406             long timeTaken = Time.monotonicNow() - startTime;
407             FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
408                 " on " + v + ": " + timeTaken + "ms");
409           } catch (ClosedChannelException e) {
410             // ignore.
411           } catch (IOException ioe) {
412             FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
413                 ". Will throw later.", ioe);
414             exceptions.add(ioe);
415           }
416         }
417       };
418       blockPoolAddingThreads.add(t);
419       t.start();
420     }
421     for (Thread t : blockPoolAddingThreads) {
422       try {
423         t.join();
424       } catch (InterruptedException ie) {
425         throw new IOException(ie);
426       }
427     }
428     if (!exceptions.isEmpty()) {
429       throw exceptions.get(0);
430     }
431 
432     long totalTimeTaken = Time.monotonicNow() - totalStartTime;
433     FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
434         bpid + ": " + totalTimeTaken + "ms");
435   }
436 
removeBlockPool(String bpid)437   void removeBlockPool(String bpid) {
438     for (FsVolumeImpl v : volumes.get()) {
439       v.shutdownBlockPool(bpid);
440     }
441   }
442 
shutdown()443   void shutdown() {
444     for (FsVolumeImpl volume : volumes.get()) {
445       if(volume != null) {
446         volume.shutdown();
447       }
448     }
449   }
450 }
451