1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; 19 20 import java.io.File; 21 import java.io.IOException; 22 import java.nio.channels.ClosedChannelException; 23 import java.util.ArrayList; 24 import java.util.Arrays; 25 import java.util.Collection; 26 import java.util.Collections; 27 import java.util.HashSet; 28 import java.util.Iterator; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.TreeMap; 32 import java.util.Set; 33 import java.util.concurrent.atomic.AtomicReference; 34 35 import com.google.common.collect.Lists; 36 import org.apache.hadoop.conf.Configuration; 37 import org.apache.hadoop.fs.StorageType; 38 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; 39 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; 40 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; 41 import org.apache.hadoop.hdfs.server.datanode.BlockScanner; 42 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; 43 import org.apache.hadoop.io.IOUtils; 44 import org.apache.hadoop.util.DiskChecker.DiskErrorException; 45 import org.apache.hadoop.util.Time; 46 47 class FsVolumeList { 48 private final AtomicReference<FsVolumeImpl[]> volumes = 49 new AtomicReference<>(new FsVolumeImpl[0]); 50 // Tracks volume failures, sorted by volume path. 51 private final Map<String, VolumeFailureInfo> volumeFailureInfos = 52 Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>()); 53 private Object checkDirsMutex = new Object(); 54 55 private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser; 56 private final BlockScanner blockScanner; 57 FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos, BlockScanner blockScanner, VolumeChoosingPolicy<FsVolumeImpl> blockChooser)58 FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos, 59 BlockScanner blockScanner, 60 VolumeChoosingPolicy<FsVolumeImpl> blockChooser) { 61 this.blockChooser = blockChooser; 62 this.blockScanner = blockScanner; 63 for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) { 64 volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), 65 volumeFailureInfo); 66 } 67 } 68 69 /** 70 * Return an immutable list view of all the volumes. 71 */ getVolumes()72 List<FsVolumeImpl> getVolumes() { 73 return Collections.unmodifiableList(Arrays.asList(volumes.get())); 74 } 75 chooseVolume(List<FsVolumeImpl> list, long blockSize)76 private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize) 77 throws IOException { 78 while (true) { 79 FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize); 80 try { 81 return volume.obtainReference(); 82 } catch (ClosedChannelException e) { 83 FsDatasetImpl.LOG.warn("Chosen a closed volume: " + volume); 84 // blockChooser.chooseVolume returns DiskOutOfSpaceException when the list 85 // is empty, indicating that all volumes are closed. 86 list.remove(volume); 87 } 88 } 89 } 90 91 /** 92 * Get next volume. 93 * 94 * @param blockSize free space needed on the volume 95 * @param storageType the desired {@link StorageType} 96 * @return next volume to store the block in. 97 */ getNextVolume(StorageType storageType, long blockSize)98 FsVolumeReference getNextVolume(StorageType storageType, long blockSize) 99 throws IOException { 100 // Get a snapshot of currently available volumes. 101 final FsVolumeImpl[] curVolumes = volumes.get(); 102 final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.length); 103 for(FsVolumeImpl v : curVolumes) { 104 if (v.getStorageType() == storageType) { 105 list.add(v); 106 } 107 } 108 return chooseVolume(list, blockSize); 109 } 110 111 /** 112 * Get next volume. 113 * 114 * @param blockSize free space needed on the volume 115 * @return next volume to store the block in. 116 */ getNextTransientVolume(long blockSize)117 FsVolumeReference getNextTransientVolume(long blockSize) throws IOException { 118 // Get a snapshot of currently available volumes. 119 final List<FsVolumeImpl> curVolumes = getVolumes(); 120 final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.size()); 121 for(FsVolumeImpl v : curVolumes) { 122 if (v.isTransientStorage()) { 123 list.add(v); 124 } 125 } 126 return chooseVolume(list, blockSize); 127 } 128 getDfsUsed()129 long getDfsUsed() throws IOException { 130 long dfsUsed = 0L; 131 for (FsVolumeImpl v : volumes.get()) { 132 try(FsVolumeReference ref = v.obtainReference()) { 133 dfsUsed += v.getDfsUsed(); 134 } catch (ClosedChannelException e) { 135 // ignore. 136 } 137 } 138 return dfsUsed; 139 } 140 getBlockPoolUsed(String bpid)141 long getBlockPoolUsed(String bpid) throws IOException { 142 long dfsUsed = 0L; 143 for (FsVolumeImpl v : volumes.get()) { 144 try (FsVolumeReference ref = v.obtainReference()) { 145 dfsUsed += v.getBlockPoolUsed(bpid); 146 } catch (ClosedChannelException e) { 147 // ignore. 148 } 149 } 150 return dfsUsed; 151 } 152 getCapacity()153 long getCapacity() { 154 long capacity = 0L; 155 for (FsVolumeImpl v : volumes.get()) { 156 try (FsVolumeReference ref = v.obtainReference()) { 157 capacity += v.getCapacity(); 158 } catch (IOException e) { 159 // ignore. 160 } 161 } 162 return capacity; 163 } 164 getRemaining()165 long getRemaining() throws IOException { 166 long remaining = 0L; 167 for (FsVolumeSpi vol : volumes.get()) { 168 try (FsVolumeReference ref = vol.obtainReference()) { 169 remaining += vol.getAvailable(); 170 } catch (ClosedChannelException e) { 171 // ignore 172 } 173 } 174 return remaining; 175 } 176 getAllVolumesMap(final String bpid, final ReplicaMap volumeMap, final RamDiskReplicaTracker ramDiskReplicaMap)177 void getAllVolumesMap(final String bpid, 178 final ReplicaMap volumeMap, 179 final RamDiskReplicaTracker ramDiskReplicaMap) 180 throws IOException { 181 long totalStartTime = Time.monotonicNow(); 182 final List<IOException> exceptions = Collections.synchronizedList( 183 new ArrayList<IOException>()); 184 List<Thread> replicaAddingThreads = new ArrayList<Thread>(); 185 for (final FsVolumeImpl v : volumes.get()) { 186 Thread t = new Thread() { 187 public void run() { 188 try (FsVolumeReference ref = v.obtainReference()) { 189 FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + 190 bpid + " on volume " + v + "..."); 191 long startTime = Time.monotonicNow(); 192 v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap); 193 long timeTaken = Time.monotonicNow() - startTime; 194 FsDatasetImpl.LOG.info("Time to add replicas to map for block pool" 195 + " " + bpid + " on volume " + v + ": " + timeTaken + "ms"); 196 } catch (ClosedChannelException e) { 197 FsDatasetImpl.LOG.info("The volume " + v + " is closed while " + 198 "addng replicas, ignored."); 199 } catch (IOException ioe) { 200 FsDatasetImpl.LOG.info("Caught exception while adding replicas " + 201 "from " + v + ". Will throw later.", ioe); 202 exceptions.add(ioe); 203 } 204 } 205 }; 206 replicaAddingThreads.add(t); 207 t.start(); 208 } 209 for (Thread t : replicaAddingThreads) { 210 try { 211 t.join(); 212 } catch (InterruptedException ie) { 213 throw new IOException(ie); 214 } 215 } 216 if (!exceptions.isEmpty()) { 217 throw exceptions.get(0); 218 } 219 long totalTimeTaken = Time.monotonicNow() - totalStartTime; 220 FsDatasetImpl.LOG.info("Total time to add all replicas to map: " 221 + totalTimeTaken + "ms"); 222 } 223 224 /** 225 * Calls {@link FsVolumeImpl#checkDirs()} on each volume. 226 * 227 * Use checkDirsMutext to allow only one instance of checkDirs() call 228 * 229 * @return list of all the failed volumes. 230 */ checkDirs()231 Set<File> checkDirs() { 232 synchronized(checkDirsMutex) { 233 Set<File> failedVols = null; 234 235 // Make a copy of volumes for performing modification 236 final List<FsVolumeImpl> volumeList = getVolumes(); 237 238 for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) { 239 final FsVolumeImpl fsv = i.next(); 240 try (FsVolumeReference ref = fsv.obtainReference()) { 241 fsv.checkDirs(); 242 } catch (DiskErrorException e) { 243 FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e); 244 if (failedVols == null) { 245 failedVols = new HashSet<>(1); 246 } 247 failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile()); 248 addVolumeFailureInfo(fsv); 249 removeVolume(fsv); 250 } catch (ClosedChannelException e) { 251 FsDatasetImpl.LOG.debug("Caught exception when obtaining " + 252 "reference count on closed volume", e); 253 } catch (IOException e) { 254 FsDatasetImpl.LOG.error("Unexpected IOException", e); 255 } 256 } 257 258 if (failedVols != null && failedVols.size() > 0) { 259 FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size() 260 + " failure volumes."); 261 } 262 263 return failedVols; 264 } 265 } 266 267 @Override toString()268 public String toString() { 269 return Arrays.toString(volumes.get()); 270 } 271 272 /** 273 * Dynamically add new volumes to the existing volumes that this DN manages. 274 * 275 * @param ref a reference to the new FsVolumeImpl instance. 276 */ addVolume(FsVolumeReference ref)277 void addVolume(FsVolumeReference ref) { 278 while (true) { 279 final FsVolumeImpl[] curVolumes = volumes.get(); 280 final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); 281 volumeList.add((FsVolumeImpl)ref.getVolume()); 282 if (volumes.compareAndSet(curVolumes, 283 volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { 284 break; 285 } else { 286 if (FsDatasetImpl.LOG.isDebugEnabled()) { 287 FsDatasetImpl.LOG.debug( 288 "The volume list has been changed concurrently, " + 289 "retry to remove volume: " + ref.getVolume().getStorageID()); 290 } 291 } 292 } 293 if (blockScanner != null) { 294 blockScanner.addVolumeScanner(ref); 295 } else { 296 // If the volume is not put into a volume scanner, it does not need to 297 // hold the reference. 298 IOUtils.cleanup(FsDatasetImpl.LOG, ref); 299 } 300 // If the volume is used to replace a failed volume, it needs to reset the 301 // volume failure info for this volume. 302 removeVolumeFailureInfo(new File(ref.getVolume().getBasePath())); 303 FsDatasetImpl.LOG.info("Added new volume: " + 304 ref.getVolume().getStorageID()); 305 } 306 307 /** 308 * Dynamically remove a volume in the list. 309 * @param target the volume instance to be removed. 310 */ removeVolume(FsVolumeImpl target)311 private void removeVolume(FsVolumeImpl target) { 312 while (true) { 313 final FsVolumeImpl[] curVolumes = volumes.get(); 314 final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); 315 if (volumeList.remove(target)) { 316 if (volumes.compareAndSet(curVolumes, 317 volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { 318 if (blockScanner != null) { 319 blockScanner.removeVolumeScanner(target); 320 } 321 try { 322 target.closeAndWait(); 323 } catch (IOException e) { 324 FsDatasetImpl.LOG.warn( 325 "Error occurs when waiting volume to close: " + target, e); 326 } 327 target.shutdown(); 328 FsDatasetImpl.LOG.info("Removed volume: " + target); 329 break; 330 } else { 331 if (FsDatasetImpl.LOG.isDebugEnabled()) { 332 FsDatasetImpl.LOG.debug( 333 "The volume list has been changed concurrently, " + 334 "retry to remove volume: " + target); 335 } 336 } 337 } else { 338 if (FsDatasetImpl.LOG.isDebugEnabled()) { 339 FsDatasetImpl.LOG.debug("Volume " + target + 340 " does not exist or is removed by others."); 341 } 342 break; 343 } 344 } 345 } 346 347 /** 348 * Dynamically remove volume in the list. 349 * @param volume the volume to be removed. 350 * @param clearFailure set true to remove failure info for this volume. 351 */ removeVolume(File volume, boolean clearFailure)352 void removeVolume(File volume, boolean clearFailure) { 353 // Make a copy of volumes to remove one volume. 354 final FsVolumeImpl[] curVolumes = volumes.get(); 355 final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); 356 for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) { 357 FsVolumeImpl fsVolume = it.next(); 358 String basePath, targetPath; 359 basePath = new File(fsVolume.getBasePath()).getAbsolutePath(); 360 targetPath = volume.getAbsolutePath(); 361 if (basePath.equals(targetPath)) { 362 // Make sure the removed volume is the one in the curVolumes. 363 removeVolume(fsVolume); 364 } 365 } 366 if (clearFailure) { 367 removeVolumeFailureInfo(volume); 368 } 369 } 370 getVolumeFailureInfos()371 VolumeFailureInfo[] getVolumeFailureInfos() { 372 Collection<VolumeFailureInfo> infos = volumeFailureInfos.values(); 373 return infos.toArray(new VolumeFailureInfo[infos.size()]); 374 } 375 addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo)376 void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) { 377 volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), 378 volumeFailureInfo); 379 } 380 addVolumeFailureInfo(FsVolumeImpl vol)381 private void addVolumeFailureInfo(FsVolumeImpl vol) { 382 addVolumeFailureInfo(new VolumeFailureInfo( 383 new File(vol.getBasePath()).getAbsolutePath(), 384 Time.now(), 385 vol.getCapacity())); 386 } 387 removeVolumeFailureInfo(File vol)388 private void removeVolumeFailureInfo(File vol) { 389 volumeFailureInfos.remove(vol.getAbsolutePath()); 390 } 391 addBlockPool(final String bpid, final Configuration conf)392 void addBlockPool(final String bpid, final Configuration conf) throws IOException { 393 long totalStartTime = Time.monotonicNow(); 394 395 final List<IOException> exceptions = Collections.synchronizedList( 396 new ArrayList<IOException>()); 397 List<Thread> blockPoolAddingThreads = new ArrayList<Thread>(); 398 for (final FsVolumeImpl v : volumes.get()) { 399 Thread t = new Thread() { 400 public void run() { 401 try (FsVolumeReference ref = v.obtainReference()) { 402 FsDatasetImpl.LOG.info("Scanning block pool " + bpid + 403 " on volume " + v + "..."); 404 long startTime = Time.monotonicNow(); 405 v.addBlockPool(bpid, conf); 406 long timeTaken = Time.monotonicNow() - startTime; 407 FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid + 408 " on " + v + ": " + timeTaken + "ms"); 409 } catch (ClosedChannelException e) { 410 // ignore. 411 } catch (IOException ioe) { 412 FsDatasetImpl.LOG.info("Caught exception while scanning " + v + 413 ". Will throw later.", ioe); 414 exceptions.add(ioe); 415 } 416 } 417 }; 418 blockPoolAddingThreads.add(t); 419 t.start(); 420 } 421 for (Thread t : blockPoolAddingThreads) { 422 try { 423 t.join(); 424 } catch (InterruptedException ie) { 425 throw new IOException(ie); 426 } 427 } 428 if (!exceptions.isEmpty()) { 429 throw exceptions.get(0); 430 } 431 432 long totalTimeTaken = Time.monotonicNow() - totalStartTime; 433 FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " + 434 bpid + ": " + totalTimeTaken + "ms"); 435 } 436 removeBlockPool(String bpid)437 void removeBlockPool(String bpid) { 438 for (FsVolumeImpl v : volumes.get()) { 439 v.shutdownBlockPool(bpid); 440 } 441 } 442 shutdown()443 void shutdown() { 444 for (FsVolumeImpl volume : volumes.get()) { 445 if(volume != null) { 446 volume.shutdown(); 447 } 448 } 449 } 450 } 451