1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
20 
21 import java.util.HashMap;
22 import java.util.LinkedList;
23 import java.util.Queue;
24 
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.yarn.conf.YarnConfiguration;
28 
29 import com.google.common.annotations.VisibleForTesting;
30 
31 /**
32  * {@link LocalCacheDirectoryManager} is used for managing hierarchical
33  * directories for local cache. It will allow to restrict the number of files in
34  * a directory to
35  * {@link YarnConfiguration#NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY} which
36  * includes 36 sub-directories (named from 0 to 9 and a to z). Root directory is
37  * represented by an empty string. It internally maintains a vacant directory
38  * queue. As soon as the file count for the directory reaches its limit; new
39  * files will not be created in it until at least one file is deleted from it.
40  * New sub directories are not created unless a
41  * {@link LocalCacheDirectoryManager#getRelativePathForLocalization()} request
42  * is made and nonFullDirectories are empty.
43  *
44  * Note : this structure only returns relative localization path but doesn't
45  * create one on disk.
46  */
47 public class LocalCacheDirectoryManager {
48 
49   private final int perDirectoryFileLimit;
50   // total 36 = a to z plus 0 to 9
51   public static final int DIRECTORIES_PER_LEVEL = 36;
52 
53   private Queue<Directory> nonFullDirectories;
54   private HashMap<String, Directory> knownDirectories;
55   private int totalSubDirectories;
56 
LocalCacheDirectoryManager(Configuration conf)57   public LocalCacheDirectoryManager(Configuration conf) {
58     totalSubDirectories = 0;
59     Directory rootDir = new Directory(totalSubDirectories);
60     nonFullDirectories = new LinkedList<Directory>();
61     knownDirectories = new HashMap<String, Directory>();
62     knownDirectories.put("", rootDir);
63     nonFullDirectories.add(rootDir);
64     this.perDirectoryFileLimit =
65         conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
66           YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY) - 36;
67   }
68 
69   /**
70    * This method will return relative path from the first available vacant
71    * directory.
72    *
73    * @return {@link String} relative path for localization
74    */
getRelativePathForLocalization()75   public synchronized String getRelativePathForLocalization() {
76     if (nonFullDirectories.isEmpty()) {
77       totalSubDirectories++;
78       Directory newDir = new Directory(totalSubDirectories);
79       nonFullDirectories.add(newDir);
80       knownDirectories.put(newDir.getRelativePath(), newDir);
81     }
82     Directory subDir = nonFullDirectories.peek();
83     if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) {
84       nonFullDirectories.remove();
85     }
86     return subDir.getRelativePath();
87   }
88 
89   /**
90    * This method will reduce the file count for the directory represented by
91    * path. The root directory of this Local cache directory manager is
92    * represented by an empty string.
93    */
decrementFileCountForPath(String relPath)94   public synchronized void decrementFileCountForPath(String relPath) {
95     relPath = relPath == null ? "" : relPath.trim();
96     Directory subDir = knownDirectories.get(relPath);
97     int oldCount = subDir.getCount();
98     if (subDir.decrementAndGetCount() < perDirectoryFileLimit
99         && oldCount >= perDirectoryFileLimit) {
100       nonFullDirectories.add(subDir);
101     }
102   }
103 
104   /**
105    * Increment the file count for a relative directory within the cache
106    *
107    * @param relPath the relative path
108    */
incrementFileCountForPath(String relPath)109   public synchronized void incrementFileCountForPath(String relPath) {
110     relPath = relPath == null ? "" : relPath.trim();
111     Directory subDir = knownDirectories.get(relPath);
112     if (subDir == null) {
113       int dirnum = Directory.getDirectoryNumber(relPath);
114       totalSubDirectories = Math.max(dirnum, totalSubDirectories);
115       subDir = new Directory(dirnum);
116       nonFullDirectories.add(subDir);
117       knownDirectories.put(subDir.getRelativePath(), subDir);
118     }
119     if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) {
120       nonFullDirectories.remove(subDir);
121     }
122   }
123 
124   /**
125    * Given a path to a directory within a local cache tree return the
126    * root of the cache directory.
127    *
128    * @param path the directory within a cache directory
129    * @return the local cache directory root or null if not found
130    */
getCacheDirectoryRoot(Path path)131   public static Path getCacheDirectoryRoot(Path path) {
132     while (path != null) {
133       String name = path.getName();
134       if (name.length() != 1) {
135         return path;
136       }
137       int dirnum = DIRECTORIES_PER_LEVEL;
138       try {
139         dirnum = Integer.parseInt(name, DIRECTORIES_PER_LEVEL);
140       } catch (NumberFormatException e) {
141       }
142       if (dirnum >= DIRECTORIES_PER_LEVEL) {
143         return path;
144       }
145       path = path.getParent();
146     }
147     return path;
148   }
149 
150   @VisibleForTesting
getDirectory(String relPath)151   synchronized Directory getDirectory(String relPath) {
152     return knownDirectories.get(relPath);
153   }
154 
155   /*
156    * It limits the number of files and sub directories in the directory to the
157    * limit LocalCacheDirectoryManager#perDirectoryFileLimit.
158    */
159   static class Directory {
160 
161     private final String relativePath;
162     private int fileCount;
163 
getRelativePath(int directoryNo)164     static String getRelativePath(int directoryNo) {
165       String relativePath = "";
166       if (directoryNo > 0) {
167         String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL);
168         StringBuffer sb = new StringBuffer();
169         if (tPath.length() == 1) {
170           sb.append(tPath.charAt(0));
171         } else {
172           // this is done to make sure we also reuse 0th sub directory
173           sb.append(Integer.toString(
174             Integer.parseInt(tPath.substring(0, 1), DIRECTORIES_PER_LEVEL) - 1,
175             DIRECTORIES_PER_LEVEL));
176         }
177         for (int i = 1; i < tPath.length(); i++) {
178           sb.append(Path.SEPARATOR).append(tPath.charAt(i));
179         }
180         relativePath = sb.toString();
181       }
182       return relativePath;
183     }
184 
getDirectoryNumber(String relativePath)185     static int getDirectoryNumber(String relativePath) {
186       String numStr = relativePath.replace("/", "");
187       if (relativePath.isEmpty()) {
188         return 0;
189       }
190       if (numStr.length() > 1) {
191         // undo step from getRelativePath() to reuse 0th sub directory
192         String firstChar = Integer.toString(
193             Integer.parseInt(numStr.substring(0, 1),
194                 DIRECTORIES_PER_LEVEL) + 1, DIRECTORIES_PER_LEVEL);
195         numStr = firstChar + numStr.substring(1);
196       }
197       return Integer.parseInt(numStr, DIRECTORIES_PER_LEVEL) + 1;
198     }
199 
Directory(int directoryNo)200     public Directory(int directoryNo) {
201       fileCount = 0;
202       relativePath = getRelativePath(directoryNo);
203     }
204 
incrementAndGetCount()205     public int incrementAndGetCount() {
206       return ++fileCount;
207     }
208 
decrementAndGetCount()209     public int decrementAndGetCount() {
210       return --fileCount;
211     }
212 
getRelativePath()213     public String getRelativePath() {
214       return relativePath;
215     }
216 
getCount()217     public int getCount() {
218       return fileCount;
219     }
220   }
221 }