1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.fs;
20 
21 import java.io.*;
22 import java.util.*;
23 
24 import org.apache.commons.logging.*;
25 
26 import org.apache.hadoop.util.*;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
30 import org.apache.hadoop.conf.Configuration;
31 
32 /** An implementation of a round-robin scheme for disk allocation for creating
33  * files. The way it works is that it is kept track what disk was last
34  * allocated for a file write. For the current request, the next disk from
35  * the set of disks would be allocated if the free space on the disk is
36  * sufficient enough to accommodate the file that is being considered for
37  * creation. If the space requirements cannot be met, the next disk in order
38  * would be tried and so on till a disk is found with sufficient capacity.
39  * Once a disk with sufficient space is identified, a check is done to make
40  * sure that the disk is writable. Also, there is an API provided that doesn't
41  * take the space requirements into consideration but just checks whether the
42  * disk under consideration is writable (this should be used for cases where
43  * the file size is not known apriori). An API is provided to read a path that
44  * was created earlier. That API works by doing a scan of all the disks for the
45  * input pathname.
46  * This implementation also provides the functionality of having multiple
47  * allocators per JVM (one for each unique functionality or context, like
48  * mapred, dfs-client, etc.). It ensures that there is only one instance of
49  * an allocator per context per JVM.
50  * Note:
51  * 1. The contexts referred above are actually the configuration items defined
52  * in the Configuration class like "mapred.local.dir" (for which we want to
53  * control the dir allocations). The context-strings are exactly those
54  * configuration items.
55  * 2. This implementation does not take into consideration cases where
56  * a disk becomes read-only or goes out of space while a file is being written
57  * to (disks are shared between multiple processes, and so the latter situation
58  * is probable).
59  * 3. In the class implementation, "Disk" is referred to as "Dir", which
60  * actually points to the configured directory on the Disk which will be the
61  * parent for all file write/read allocations.
62  */
63 public class LocalDirAllocator {
64 
65   //A Map from the config item names like "mapred.local.dir",
66   //"dfs.client.buffer.dir" to the instance of the AllocatorPerContext. This
67   //is a static object to make sure there exists exactly one instance per JVM
68   private static Map <String, AllocatorPerContext> contexts =
69                  new TreeMap<String, AllocatorPerContext>();
70   private String contextCfgItemName;
71 
72   /** Used when size of file to be allocated is unknown. */
73   public static final int SIZE_UNKNOWN = -1;
74 
75   /**Create an allocator object
76    * @param contextCfgItemName
77    */
LocalDirAllocator(String contextCfgItemName)78   public LocalDirAllocator(String contextCfgItemName) {
79     this.contextCfgItemName = contextCfgItemName;
80   }
81 
82   /** This method must be used to obtain the dir allocation context for a
83    * particular value of the context name. The context name must be an item
84    * defined in the Configuration object for which we want to control the
85    * dir allocations (e.g., <code>mapred.local.dir</code>). The method will
86    * create a context for that name if it doesn't already exist.
87    */
obtainContext(String contextCfgItemName)88   private AllocatorPerContext obtainContext(String contextCfgItemName) {
89     synchronized (contexts) {
90       AllocatorPerContext l = contexts.get(contextCfgItemName);
91       if (l == null) {
92         contexts.put(contextCfgItemName,
93                     (l = new AllocatorPerContext(contextCfgItemName)));
94       }
95       return l;
96     }
97   }
98 
99   /** Get a path from the local FS. This method should be used if the size of
100    *  the file is not known apriori. We go round-robin over the set of disks
101    *  (via the configured dirs) and return the first complete path where
102    *  we could create the parent directory of the passed path.
103    *  @param pathStr the requested path (this will be created on the first
104    *  available disk)
105    *  @param conf the Configuration object
106    *  @return the complete path to the file on a local disk
107    *  @throws IOException
108    */
getLocalPathForWrite(String pathStr, Configuration conf)109   public Path getLocalPathForWrite(String pathStr,
110       Configuration conf) throws IOException {
111     return getLocalPathForWrite(pathStr, SIZE_UNKNOWN, conf);
112   }
113 
114   /** Get a path from the local FS. Pass size as
115    *  SIZE_UNKNOWN if not known apriori. We
116    *  round-robin over the set of disks (via the configured dirs) and return
117    *  the first complete path which has enough space
118    *  @param pathStr the requested path (this will be created on the first
119    *  available disk)
120    *  @param size the size of the file that is going to be written
121    *  @param conf the Configuration object
122    *  @return the complete path to the file on a local disk
123    *  @throws IOException
124    */
getLocalPathForWrite(String pathStr, long size, Configuration conf)125   public Path getLocalPathForWrite(String pathStr, long size,
126       Configuration conf) throws IOException {
127 	return getLocalPathForWrite(pathStr, size, conf, true);
128   }
129 
130   /** Get a path from the local FS. Pass size as
131    *  SIZE_UNKNOWN if not known apriori. We
132    *  round-robin over the set of disks (via the configured dirs) and return
133    *  the first complete path which has enough space
134    *  @param pathStr the requested path (this will be created on the first
135    *  available disk)
136    *  @param size the size of the file that is going to be written
137    *  @param conf the Configuration object
138    *  @param checkWrite ensure that the path is writable
139    *  @return the complete path to the file on a local disk
140    *  @throws IOException
141    */
getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite)142   public Path getLocalPathForWrite(String pathStr, long size,
143                                    Configuration conf,
144                                    boolean checkWrite) throws IOException {
145     AllocatorPerContext context = obtainContext(contextCfgItemName);
146     return context.getLocalPathForWrite(pathStr, size, conf, checkWrite);
147   }
148 
149   /** Get a path from the local FS for reading. We search through all the
150    *  configured dirs for the file's existence and return the complete
151    *  path to the file when we find one
152    *  @param pathStr the requested file (this will be searched)
153    *  @param conf the Configuration object
154    *  @return the complete path to the file on a local disk
155    *  @throws IOException
156    */
getLocalPathToRead(String pathStr, Configuration conf)157   public Path getLocalPathToRead(String pathStr,
158       Configuration conf) throws IOException {
159     AllocatorPerContext context = obtainContext(contextCfgItemName);
160     return context.getLocalPathToRead(pathStr, conf);
161   }
162 
163   /**
164    * Get all of the paths that currently exist in the working directories.
165    * @param pathStr the path underneath the roots
166    * @param conf the configuration to look up the roots in
167    * @return all of the paths that exist under any of the roots
168    * @throws IOException
169    */
getAllLocalPathsToRead(String pathStr, Configuration conf )170   public Iterable<Path> getAllLocalPathsToRead(String pathStr,
171                                                Configuration conf
172                                                ) throws IOException {
173     AllocatorPerContext context;
174     synchronized (this) {
175       context = obtainContext(contextCfgItemName);
176     }
177     return context.getAllLocalPathsToRead(pathStr, conf);
178   }
179 
180   /** Creates a temporary file in the local FS. Pass size as -1 if not known
181    *  apriori. We round-robin over the set of disks (via the configured dirs)
182    *  and select the first complete path which has enough space. A file is
183    *  created on this directory. The file is guaranteed to go away when the
184    *  JVM exits.
185    *  @param pathStr prefix for the temporary file
186    *  @param size the size of the file that is going to be written
187    *  @param conf the Configuration object
188    *  @return a unique temporary file
189    *  @throws IOException
190    */
createTmpFileForWrite(String pathStr, long size, Configuration conf)191   public File createTmpFileForWrite(String pathStr, long size,
192       Configuration conf) throws IOException {
193     AllocatorPerContext context = obtainContext(contextCfgItemName);
194     return context.createTmpFileForWrite(pathStr, size, conf);
195   }
196 
197   /** Method to check whether a context is valid
198    * @param contextCfgItemName
199    * @return true/false
200    */
isContextValid(String contextCfgItemName)201   public static boolean isContextValid(String contextCfgItemName) {
202     synchronized (contexts) {
203       return contexts.containsKey(contextCfgItemName);
204     }
205   }
206 
207   /** We search through all the configured dirs for the file's existence
208    *  and return true when we find
209    *  @param pathStr the requested file (this will be searched)
210    *  @param conf the Configuration object
211    *  @return true if files exist. false otherwise
212    *  @throws IOException
213    */
ifExists(String pathStr,Configuration conf)214   public boolean ifExists(String pathStr,Configuration conf) {
215     AllocatorPerContext context = obtainContext(contextCfgItemName);
216     return context.ifExists(pathStr, conf);
217   }
218 
219   /**
220    * Get the current directory index for the given configuration item.
221    * @return the current directory index for the given configuration item.
222    */
getCurrentDirectoryIndex()223   int getCurrentDirectoryIndex() {
224     AllocatorPerContext context = obtainContext(contextCfgItemName);
225     return context.getCurrentDirectoryIndex();
226   }
227 
228   private static class AllocatorPerContext {
229 
230     private final Log LOG =
231       LogFactory.getLog(AllocatorPerContext.class);
232 
233     private int dirNumLastAccessed;
234     private Random dirIndexRandomizer = new Random();
235     private FileSystem localFS;
236     private DF[] dirDF;
237     private String contextCfgItemName;
238     private Path[] localDirsPath;
239     private String savedLocalDirs = "";
240 
AllocatorPerContext(String contextCfgItemName)241     public AllocatorPerContext(String contextCfgItemName) {
242       this.contextCfgItemName = contextCfgItemName;
243     }
244 
245     /** This method gets called everytime before any read/write to make sure
246      * that any change to localDirs is reflected immediately.
247      */
confChanged(Configuration conf )248     private synchronized void confChanged(Configuration conf
249                                           ) throws IOException {
250       String newLocalDirs = conf.get(contextCfgItemName);
251       if (!newLocalDirs.equals(savedLocalDirs)) {
252         String[] localDirs = conf.getStrings(contextCfgItemName);
253         localFS = FileSystem.getLocal(conf);
254         int numDirs = localDirs.length;
255         ArrayList<String> dirs = new ArrayList<String>(numDirs);
256         ArrayList<DF> dfList = new ArrayList<DF>(numDirs);
257         for (int i = 0; i < numDirs; i++) {
258           try {
259             // filter problematic directories
260             Path tmpDir = new Path(localDirs[i]);
261             if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) {
262               try {
263                 DiskChecker.checkDir(new File(localDirs[i]));
264                 dirs.add(localDirs[i]);
265                 dfList.add(new DF(new File(localDirs[i]), 30000));
266               } catch (DiskErrorException de) {
267                 LOG.warn( localDirs[i] + "is not writable\n" +
268                     StringUtils.stringifyException(de));
269               }
270             } else {
271               LOG.warn( "Failed to create " + localDirs[i]);
272             }
273           } catch (IOException ie) {
274             LOG.warn( "Failed to create " + localDirs[i] + ": " +
275                 ie.getMessage() + "\n" + StringUtils.stringifyException(ie));
276           } //ignore
277         }
278         localDirsPath = new Path[dirs.size()];
279         for(int i=0;i<localDirsPath.length;i++) {
280           localDirsPath[i] = new Path(dirs.get(i));
281         }
282         dirDF = dfList.toArray(new DF[dirs.size()]);
283         savedLocalDirs = newLocalDirs;
284 
285         // randomize the first disk picked in the round-robin selection
286         dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size());
287       }
288     }
289 
createPath(Path path, boolean checkWrite)290     private Path createPath(Path path,
291     		boolean checkWrite) throws IOException {
292       Path file = new Path(localDirsPath[dirNumLastAccessed], path);
293 
294       if (checkWrite) {
295     	//check whether we are able to create a directory here. If the disk
296     	//happens to be RDONLY we will fail
297     	try {
298           DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
299     	} catch (DiskErrorException d) {
300           LOG.warn(StringUtils.stringifyException(d));
301           return null;
302     	}
303       }
304       return file;
305     }
306 
307     /**
308      * Get the current directory index.
309      * @return the current directory index.
310      */
getCurrentDirectoryIndex()311     int getCurrentDirectoryIndex() {
312       return dirNumLastAccessed;
313     }
314 
315     /** Get a path from the local FS. If size is known, we go
316      *  round-robin over the set of disks (via the configured dirs) and return
317      *  the first complete path which has enough space.
318      *
319      *  If size is not known, use roulette selection -- pick directories
320      *  with probability proportional to their available space.
321      */
322     public synchronized
getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite )323     Path getLocalPathForWrite(String pathStr, long size,
324     	                      Configuration conf, boolean checkWrite
325     	                      ) throws IOException {
326       confChanged(conf);
327       int numDirs = localDirsPath.length;
328       int numDirsSearched = 0;
329       //remove the leading slash from the path (to make sure that the uri
330       //resolution results in a valid path on the dir being checked)
331       if (pathStr.startsWith("/")) {
332         pathStr = pathStr.substring(1);
333       }
334       Path returnPath = null;
335       Path path = new Path(pathStr);
336 
337       if(size == SIZE_UNKNOWN) {  //do roulette selection: pick dir with probability
338                     //proportional to available size
339         long[] availableOnDisk = new long[dirDF.length];
340         long totalAvailable = 0;
341 
342             //build the "roulette wheel"
343         for(int i =0; i < dirDF.length; ++i) {
344           availableOnDisk[i] = dirDF[i].getAvailable();
345           totalAvailable += availableOnDisk[i];
346         }
347 
348         // Keep rolling the wheel till we get a valid path
349         Random r = new java.util.Random();
350         while (numDirsSearched < numDirs && returnPath == null) {
351           long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
352           int dir = 0;
353           while (randomPosition > availableOnDisk[dir]) {
354             randomPosition -= availableOnDisk[dir];
355             dir++;
356           }
357           dirNumLastAccessed = dir;
358           returnPath = createPath(path, checkWrite);
359           if (returnPath == null) {
360             totalAvailable -= availableOnDisk[dir];
361             availableOnDisk[dir] = 0; // skip this disk
362             numDirsSearched++;
363           }
364         }
365       } else {
366         while (numDirsSearched < numDirs && returnPath == null) {
367           long capacity = dirDF[dirNumLastAccessed].getAvailable();
368           if (capacity > size) {
369         	  returnPath = createPath(path, checkWrite);
370           }
371           dirNumLastAccessed++;
372           dirNumLastAccessed = dirNumLastAccessed % numDirs;
373           numDirsSearched++;
374         }
375       }
376       if (returnPath != null) {
377         return returnPath;
378       }
379 
380       //no path found
381       throw new DiskErrorException("Could not find any valid local " +
382           "directory for " + pathStr);
383     }
384 
385     /** Creates a file on the local FS. Pass size as
386      * {@link LocalDirAllocator.SIZE_UNKNOWN} if not known apriori. We
387      *  round-robin over the set of disks (via the configured dirs) and return
388      *  a file on the first path which has enough space. The file is guaranteed
389      *  to go away when the JVM exits.
390      */
createTmpFileForWrite(String pathStr, long size, Configuration conf)391     public File createTmpFileForWrite(String pathStr, long size,
392         Configuration conf) throws IOException {
393 
394       // find an appropriate directory
395       Path path = getLocalPathForWrite(pathStr, size, conf, true);
396       File dir = new File(path.getParent().toUri().getPath());
397       String prefix = path.getName();
398 
399       // create a temp file on this directory
400       File result = File.createTempFile(prefix, null, dir);
401       result.deleteOnExit();
402       return result;
403     }
404 
405     /** Get a path from the local FS for reading. We search through all the
406      *  configured dirs for the file's existence and return the complete
407      *  path to the file when we find one
408      */
getLocalPathToRead(String pathStr, Configuration conf)409     public synchronized Path getLocalPathToRead(String pathStr,
410         Configuration conf) throws IOException {
411       confChanged(conf);
412       int numDirs = localDirsPath.length;
413       int numDirsSearched = 0;
414       //remove the leading slash from the path (to make sure that the uri
415       //resolution results in a valid path on the dir being checked)
416       if (pathStr.startsWith("/")) {
417         pathStr = pathStr.substring(1);
418       }
419       Path childPath = new Path(pathStr);
420       while (numDirsSearched < numDirs) {
421     	 Path file = new Path(localDirsPath[numDirsSearched], childPath);
422         if (localFS.exists(file)) {
423           return file;
424         }
425         numDirsSearched++;
426       }
427 
428       //no path found
429       throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
430       " the configured local directories");
431     }
432 
433     private static
434     class PathIterator implements Iterator<Path>, Iterable<Path> {
435       private final FileSystem fs;
436       private final String pathStr;
437       private int i = 0;
438       private final Path[] rootDirs;
439       private Path next = null;
440 
PathIterator(FileSystem fs, String pathStr, Path[] rootDirs )441       private PathIterator(FileSystem fs, String pathStr, Path[] rootDirs
442                            ) throws IOException {
443         this.fs = fs;
444         this.pathStr = pathStr;
445         this.rootDirs = rootDirs;
446         advance();
447       }
448 
449       @Override
hasNext()450       public boolean hasNext() {
451         return next != null;
452       }
453 
advance()454       private void advance() throws IOException {
455         while (i < rootDirs.length) {
456           next = new Path(rootDirs[i++], pathStr);
457           if (fs.exists(next)) {
458             return;
459           }
460         }
461         next = null;
462       }
463 
464       @Override
next()465       public Path next() {
466         Path result = next;
467         try {
468           advance();
469         } catch (IOException ie) {
470           throw new RuntimeException("Can't check existance of " + next, ie);
471         }
472         return result;
473       }
474 
475       @Override
remove()476       public void remove() {
477         throw new UnsupportedOperationException("read only iterator");
478       }
479 
480       @Override
iterator()481       public Iterator<Path> iterator() {
482         return this;
483       }
484     }
485 
486     /**
487      * Get all of the paths that currently exist in the working directories.
488      * @param pathStr the path underneath the roots
489      * @param conf the configuration to look up the roots in
490      * @return all of the paths that exist under any of the roots
491      * @throws IOException
492      */
getAllLocalPathsToRead(String pathStr, Configuration conf )493     synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr,
494                                                        Configuration conf
495                                                        ) throws IOException {
496       confChanged(conf);
497       if (pathStr.startsWith("/")) {
498         pathStr = pathStr.substring(1);
499       }
500       return new PathIterator(localFS, pathStr, localDirsPath);
501     }
502 
503     /** We search through all the configured dirs for the file's existence
504      *  and return true when we find one
505      */
ifExists(String pathStr,Configuration conf)506     public synchronized boolean ifExists(String pathStr,Configuration conf) {
507       try {
508         int numDirs = localDirsPath.length;
509         int numDirsSearched = 0;
510         //remove the leading slash from the path (to make sure that the uri
511         //resolution results in a valid path on the dir being checked)
512         if (pathStr.startsWith("/")) {
513           pathStr = pathStr.substring(1);
514         }
515         Path childPath = new Path(pathStr);
516         while (numDirsSearched < numDirs) {
517 	   Path file = new Path(localDirsPath[numDirsSearched], childPath);
518           if (localFS.exists(file)) {
519             return true;
520           }
521           numDirsSearched++;
522         }
523       } catch (IOException e) {
524         // IGNORE and try again
525       }
526       return false;
527     }
528   }
529 }
530