1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.fs; 20 21 import java.io.*; 22 import java.util.*; 23 24 import org.apache.commons.logging.*; 25 26 import org.apache.hadoop.util.*; 27 import org.apache.hadoop.fs.FileSystem; 28 import org.apache.hadoop.fs.Path; 29 import org.apache.hadoop.util.DiskChecker.DiskErrorException; 30 import org.apache.hadoop.conf.Configuration; 31 32 /** An implementation of a round-robin scheme for disk allocation for creating 33 * files. The way it works is that it is kept track what disk was last 34 * allocated for a file write. For the current request, the next disk from 35 * the set of disks would be allocated if the free space on the disk is 36 * sufficient enough to accommodate the file that is being considered for 37 * creation. If the space requirements cannot be met, the next disk in order 38 * would be tried and so on till a disk is found with sufficient capacity. 39 * Once a disk with sufficient space is identified, a check is done to make 40 * sure that the disk is writable. Also, there is an API provided that doesn't 41 * take the space requirements into consideration but just checks whether the 42 * disk under consideration is writable (this should be used for cases where 43 * the file size is not known apriori). An API is provided to read a path that 44 * was created earlier. That API works by doing a scan of all the disks for the 45 * input pathname. 46 * This implementation also provides the functionality of having multiple 47 * allocators per JVM (one for each unique functionality or context, like 48 * mapred, dfs-client, etc.). It ensures that there is only one instance of 49 * an allocator per context per JVM. 50 * Note: 51 * 1. The contexts referred above are actually the configuration items defined 52 * in the Configuration class like "mapred.local.dir" (for which we want to 53 * control the dir allocations). The context-strings are exactly those 54 * configuration items. 55 * 2. This implementation does not take into consideration cases where 56 * a disk becomes read-only or goes out of space while a file is being written 57 * to (disks are shared between multiple processes, and so the latter situation 58 * is probable). 59 * 3. In the class implementation, "Disk" is referred to as "Dir", which 60 * actually points to the configured directory on the Disk which will be the 61 * parent for all file write/read allocations. 62 */ 63 public class LocalDirAllocator { 64 65 //A Map from the config item names like "mapred.local.dir", 66 //"dfs.client.buffer.dir" to the instance of the AllocatorPerContext. This 67 //is a static object to make sure there exists exactly one instance per JVM 68 private static Map <String, AllocatorPerContext> contexts = 69 new TreeMap<String, AllocatorPerContext>(); 70 private String contextCfgItemName; 71 72 /** Used when size of file to be allocated is unknown. */ 73 public static final int SIZE_UNKNOWN = -1; 74 75 /**Create an allocator object 76 * @param contextCfgItemName 77 */ LocalDirAllocator(String contextCfgItemName)78 public LocalDirAllocator(String contextCfgItemName) { 79 this.contextCfgItemName = contextCfgItemName; 80 } 81 82 /** This method must be used to obtain the dir allocation context for a 83 * particular value of the context name. The context name must be an item 84 * defined in the Configuration object for which we want to control the 85 * dir allocations (e.g., <code>mapred.local.dir</code>). The method will 86 * create a context for that name if it doesn't already exist. 87 */ obtainContext(String contextCfgItemName)88 private AllocatorPerContext obtainContext(String contextCfgItemName) { 89 synchronized (contexts) { 90 AllocatorPerContext l = contexts.get(contextCfgItemName); 91 if (l == null) { 92 contexts.put(contextCfgItemName, 93 (l = new AllocatorPerContext(contextCfgItemName))); 94 } 95 return l; 96 } 97 } 98 99 /** Get a path from the local FS. This method should be used if the size of 100 * the file is not known apriori. We go round-robin over the set of disks 101 * (via the configured dirs) and return the first complete path where 102 * we could create the parent directory of the passed path. 103 * @param pathStr the requested path (this will be created on the first 104 * available disk) 105 * @param conf the Configuration object 106 * @return the complete path to the file on a local disk 107 * @throws IOException 108 */ getLocalPathForWrite(String pathStr, Configuration conf)109 public Path getLocalPathForWrite(String pathStr, 110 Configuration conf) throws IOException { 111 return getLocalPathForWrite(pathStr, SIZE_UNKNOWN, conf); 112 } 113 114 /** Get a path from the local FS. Pass size as 115 * SIZE_UNKNOWN if not known apriori. We 116 * round-robin over the set of disks (via the configured dirs) and return 117 * the first complete path which has enough space 118 * @param pathStr the requested path (this will be created on the first 119 * available disk) 120 * @param size the size of the file that is going to be written 121 * @param conf the Configuration object 122 * @return the complete path to the file on a local disk 123 * @throws IOException 124 */ getLocalPathForWrite(String pathStr, long size, Configuration conf)125 public Path getLocalPathForWrite(String pathStr, long size, 126 Configuration conf) throws IOException { 127 return getLocalPathForWrite(pathStr, size, conf, true); 128 } 129 130 /** Get a path from the local FS. Pass size as 131 * SIZE_UNKNOWN if not known apriori. We 132 * round-robin over the set of disks (via the configured dirs) and return 133 * the first complete path which has enough space 134 * @param pathStr the requested path (this will be created on the first 135 * available disk) 136 * @param size the size of the file that is going to be written 137 * @param conf the Configuration object 138 * @param checkWrite ensure that the path is writable 139 * @return the complete path to the file on a local disk 140 * @throws IOException 141 */ getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite)142 public Path getLocalPathForWrite(String pathStr, long size, 143 Configuration conf, 144 boolean checkWrite) throws IOException { 145 AllocatorPerContext context = obtainContext(contextCfgItemName); 146 return context.getLocalPathForWrite(pathStr, size, conf, checkWrite); 147 } 148 149 /** Get a path from the local FS for reading. We search through all the 150 * configured dirs for the file's existence and return the complete 151 * path to the file when we find one 152 * @param pathStr the requested file (this will be searched) 153 * @param conf the Configuration object 154 * @return the complete path to the file on a local disk 155 * @throws IOException 156 */ getLocalPathToRead(String pathStr, Configuration conf)157 public Path getLocalPathToRead(String pathStr, 158 Configuration conf) throws IOException { 159 AllocatorPerContext context = obtainContext(contextCfgItemName); 160 return context.getLocalPathToRead(pathStr, conf); 161 } 162 163 /** 164 * Get all of the paths that currently exist in the working directories. 165 * @param pathStr the path underneath the roots 166 * @param conf the configuration to look up the roots in 167 * @return all of the paths that exist under any of the roots 168 * @throws IOException 169 */ getAllLocalPathsToRead(String pathStr, Configuration conf )170 public Iterable<Path> getAllLocalPathsToRead(String pathStr, 171 Configuration conf 172 ) throws IOException { 173 AllocatorPerContext context; 174 synchronized (this) { 175 context = obtainContext(contextCfgItemName); 176 } 177 return context.getAllLocalPathsToRead(pathStr, conf); 178 } 179 180 /** Creates a temporary file in the local FS. Pass size as -1 if not known 181 * apriori. We round-robin over the set of disks (via the configured dirs) 182 * and select the first complete path which has enough space. A file is 183 * created on this directory. The file is guaranteed to go away when the 184 * JVM exits. 185 * @param pathStr prefix for the temporary file 186 * @param size the size of the file that is going to be written 187 * @param conf the Configuration object 188 * @return a unique temporary file 189 * @throws IOException 190 */ createTmpFileForWrite(String pathStr, long size, Configuration conf)191 public File createTmpFileForWrite(String pathStr, long size, 192 Configuration conf) throws IOException { 193 AllocatorPerContext context = obtainContext(contextCfgItemName); 194 return context.createTmpFileForWrite(pathStr, size, conf); 195 } 196 197 /** Method to check whether a context is valid 198 * @param contextCfgItemName 199 * @return true/false 200 */ isContextValid(String contextCfgItemName)201 public static boolean isContextValid(String contextCfgItemName) { 202 synchronized (contexts) { 203 return contexts.containsKey(contextCfgItemName); 204 } 205 } 206 207 /** We search through all the configured dirs for the file's existence 208 * and return true when we find 209 * @param pathStr the requested file (this will be searched) 210 * @param conf the Configuration object 211 * @return true if files exist. false otherwise 212 * @throws IOException 213 */ ifExists(String pathStr,Configuration conf)214 public boolean ifExists(String pathStr,Configuration conf) { 215 AllocatorPerContext context = obtainContext(contextCfgItemName); 216 return context.ifExists(pathStr, conf); 217 } 218 219 /** 220 * Get the current directory index for the given configuration item. 221 * @return the current directory index for the given configuration item. 222 */ getCurrentDirectoryIndex()223 int getCurrentDirectoryIndex() { 224 AllocatorPerContext context = obtainContext(contextCfgItemName); 225 return context.getCurrentDirectoryIndex(); 226 } 227 228 private static class AllocatorPerContext { 229 230 private final Log LOG = 231 LogFactory.getLog(AllocatorPerContext.class); 232 233 private int dirNumLastAccessed; 234 private Random dirIndexRandomizer = new Random(); 235 private FileSystem localFS; 236 private DF[] dirDF; 237 private String contextCfgItemName; 238 private Path[] localDirsPath; 239 private String savedLocalDirs = ""; 240 AllocatorPerContext(String contextCfgItemName)241 public AllocatorPerContext(String contextCfgItemName) { 242 this.contextCfgItemName = contextCfgItemName; 243 } 244 245 /** This method gets called everytime before any read/write to make sure 246 * that any change to localDirs is reflected immediately. 247 */ confChanged(Configuration conf )248 private synchronized void confChanged(Configuration conf 249 ) throws IOException { 250 String newLocalDirs = conf.get(contextCfgItemName); 251 if (!newLocalDirs.equals(savedLocalDirs)) { 252 String[] localDirs = conf.getStrings(contextCfgItemName); 253 localFS = FileSystem.getLocal(conf); 254 int numDirs = localDirs.length; 255 ArrayList<String> dirs = new ArrayList<String>(numDirs); 256 ArrayList<DF> dfList = new ArrayList<DF>(numDirs); 257 for (int i = 0; i < numDirs; i++) { 258 try { 259 // filter problematic directories 260 Path tmpDir = new Path(localDirs[i]); 261 if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) { 262 try { 263 DiskChecker.checkDir(new File(localDirs[i])); 264 dirs.add(localDirs[i]); 265 dfList.add(new DF(new File(localDirs[i]), 30000)); 266 } catch (DiskErrorException de) { 267 LOG.warn( localDirs[i] + "is not writable\n" + 268 StringUtils.stringifyException(de)); 269 } 270 } else { 271 LOG.warn( "Failed to create " + localDirs[i]); 272 } 273 } catch (IOException ie) { 274 LOG.warn( "Failed to create " + localDirs[i] + ": " + 275 ie.getMessage() + "\n" + StringUtils.stringifyException(ie)); 276 } //ignore 277 } 278 localDirsPath = new Path[dirs.size()]; 279 for(int i=0;i<localDirsPath.length;i++) { 280 localDirsPath[i] = new Path(dirs.get(i)); 281 } 282 dirDF = dfList.toArray(new DF[dirs.size()]); 283 savedLocalDirs = newLocalDirs; 284 285 // randomize the first disk picked in the round-robin selection 286 dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size()); 287 } 288 } 289 createPath(Path path, boolean checkWrite)290 private Path createPath(Path path, 291 boolean checkWrite) throws IOException { 292 Path file = new Path(localDirsPath[dirNumLastAccessed], path); 293 294 if (checkWrite) { 295 //check whether we are able to create a directory here. If the disk 296 //happens to be RDONLY we will fail 297 try { 298 DiskChecker.checkDir(new File(file.getParent().toUri().getPath())); 299 } catch (DiskErrorException d) { 300 LOG.warn(StringUtils.stringifyException(d)); 301 return null; 302 } 303 } 304 return file; 305 } 306 307 /** 308 * Get the current directory index. 309 * @return the current directory index. 310 */ getCurrentDirectoryIndex()311 int getCurrentDirectoryIndex() { 312 return dirNumLastAccessed; 313 } 314 315 /** Get a path from the local FS. If size is known, we go 316 * round-robin over the set of disks (via the configured dirs) and return 317 * the first complete path which has enough space. 318 * 319 * If size is not known, use roulette selection -- pick directories 320 * with probability proportional to their available space. 321 */ 322 public synchronized getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite )323 Path getLocalPathForWrite(String pathStr, long size, 324 Configuration conf, boolean checkWrite 325 ) throws IOException { 326 confChanged(conf); 327 int numDirs = localDirsPath.length; 328 int numDirsSearched = 0; 329 //remove the leading slash from the path (to make sure that the uri 330 //resolution results in a valid path on the dir being checked) 331 if (pathStr.startsWith("/")) { 332 pathStr = pathStr.substring(1); 333 } 334 Path returnPath = null; 335 Path path = new Path(pathStr); 336 337 if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability 338 //proportional to available size 339 long[] availableOnDisk = new long[dirDF.length]; 340 long totalAvailable = 0; 341 342 //build the "roulette wheel" 343 for(int i =0; i < dirDF.length; ++i) { 344 availableOnDisk[i] = dirDF[i].getAvailable(); 345 totalAvailable += availableOnDisk[i]; 346 } 347 348 // Keep rolling the wheel till we get a valid path 349 Random r = new java.util.Random(); 350 while (numDirsSearched < numDirs && returnPath == null) { 351 long randomPosition = Math.abs(r.nextLong()) % totalAvailable; 352 int dir = 0; 353 while (randomPosition > availableOnDisk[dir]) { 354 randomPosition -= availableOnDisk[dir]; 355 dir++; 356 } 357 dirNumLastAccessed = dir; 358 returnPath = createPath(path, checkWrite); 359 if (returnPath == null) { 360 totalAvailable -= availableOnDisk[dir]; 361 availableOnDisk[dir] = 0; // skip this disk 362 numDirsSearched++; 363 } 364 } 365 } else { 366 while (numDirsSearched < numDirs && returnPath == null) { 367 long capacity = dirDF[dirNumLastAccessed].getAvailable(); 368 if (capacity > size) { 369 returnPath = createPath(path, checkWrite); 370 } 371 dirNumLastAccessed++; 372 dirNumLastAccessed = dirNumLastAccessed % numDirs; 373 numDirsSearched++; 374 } 375 } 376 if (returnPath != null) { 377 return returnPath; 378 } 379 380 //no path found 381 throw new DiskErrorException("Could not find any valid local " + 382 "directory for " + pathStr); 383 } 384 385 /** Creates a file on the local FS. Pass size as 386 * {@link LocalDirAllocator.SIZE_UNKNOWN} if not known apriori. We 387 * round-robin over the set of disks (via the configured dirs) and return 388 * a file on the first path which has enough space. The file is guaranteed 389 * to go away when the JVM exits. 390 */ createTmpFileForWrite(String pathStr, long size, Configuration conf)391 public File createTmpFileForWrite(String pathStr, long size, 392 Configuration conf) throws IOException { 393 394 // find an appropriate directory 395 Path path = getLocalPathForWrite(pathStr, size, conf, true); 396 File dir = new File(path.getParent().toUri().getPath()); 397 String prefix = path.getName(); 398 399 // create a temp file on this directory 400 File result = File.createTempFile(prefix, null, dir); 401 result.deleteOnExit(); 402 return result; 403 } 404 405 /** Get a path from the local FS for reading. We search through all the 406 * configured dirs for the file's existence and return the complete 407 * path to the file when we find one 408 */ getLocalPathToRead(String pathStr, Configuration conf)409 public synchronized Path getLocalPathToRead(String pathStr, 410 Configuration conf) throws IOException { 411 confChanged(conf); 412 int numDirs = localDirsPath.length; 413 int numDirsSearched = 0; 414 //remove the leading slash from the path (to make sure that the uri 415 //resolution results in a valid path on the dir being checked) 416 if (pathStr.startsWith("/")) { 417 pathStr = pathStr.substring(1); 418 } 419 Path childPath = new Path(pathStr); 420 while (numDirsSearched < numDirs) { 421 Path file = new Path(localDirsPath[numDirsSearched], childPath); 422 if (localFS.exists(file)) { 423 return file; 424 } 425 numDirsSearched++; 426 } 427 428 //no path found 429 throw new DiskErrorException ("Could not find " + pathStr +" in any of" + 430 " the configured local directories"); 431 } 432 433 private static 434 class PathIterator implements Iterator<Path>, Iterable<Path> { 435 private final FileSystem fs; 436 private final String pathStr; 437 private int i = 0; 438 private final Path[] rootDirs; 439 private Path next = null; 440 PathIterator(FileSystem fs, String pathStr, Path[] rootDirs )441 private PathIterator(FileSystem fs, String pathStr, Path[] rootDirs 442 ) throws IOException { 443 this.fs = fs; 444 this.pathStr = pathStr; 445 this.rootDirs = rootDirs; 446 advance(); 447 } 448 449 @Override hasNext()450 public boolean hasNext() { 451 return next != null; 452 } 453 advance()454 private void advance() throws IOException { 455 while (i < rootDirs.length) { 456 next = new Path(rootDirs[i++], pathStr); 457 if (fs.exists(next)) { 458 return; 459 } 460 } 461 next = null; 462 } 463 464 @Override next()465 public Path next() { 466 Path result = next; 467 try { 468 advance(); 469 } catch (IOException ie) { 470 throw new RuntimeException("Can't check existance of " + next, ie); 471 } 472 return result; 473 } 474 475 @Override remove()476 public void remove() { 477 throw new UnsupportedOperationException("read only iterator"); 478 } 479 480 @Override iterator()481 public Iterator<Path> iterator() { 482 return this; 483 } 484 } 485 486 /** 487 * Get all of the paths that currently exist in the working directories. 488 * @param pathStr the path underneath the roots 489 * @param conf the configuration to look up the roots in 490 * @return all of the paths that exist under any of the roots 491 * @throws IOException 492 */ getAllLocalPathsToRead(String pathStr, Configuration conf )493 synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr, 494 Configuration conf 495 ) throws IOException { 496 confChanged(conf); 497 if (pathStr.startsWith("/")) { 498 pathStr = pathStr.substring(1); 499 } 500 return new PathIterator(localFS, pathStr, localDirsPath); 501 } 502 503 /** We search through all the configured dirs for the file's existence 504 * and return true when we find one 505 */ ifExists(String pathStr,Configuration conf)506 public synchronized boolean ifExists(String pathStr,Configuration conf) { 507 try { 508 int numDirs = localDirsPath.length; 509 int numDirsSearched = 0; 510 //remove the leading slash from the path (to make sure that the uri 511 //resolution results in a valid path on the dir being checked) 512 if (pathStr.startsWith("/")) { 513 pathStr = pathStr.substring(1); 514 } 515 Path childPath = new Path(pathStr); 516 while (numDirsSearched < numDirs) { 517 Path file = new Path(localDirsPath[numDirsSearched], childPath); 518 if (localFS.exists(file)) { 519 return true; 520 } 521 numDirsSearched++; 522 } 523 } catch (IOException e) { 524 // IGNORE and try again 525 } 526 return false; 527 } 528 } 529 } 530