1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; 20 21 import java.util.HashMap; 22 import java.util.LinkedList; 23 import java.util.Queue; 24 25 import org.apache.hadoop.conf.Configuration; 26 import org.apache.hadoop.fs.Path; 27 import org.apache.hadoop.yarn.conf.YarnConfiguration; 28 29 import com.google.common.annotations.VisibleForTesting; 30 31 /** 32 * {@link LocalCacheDirectoryManager} is used for managing hierarchical 33 * directories for local cache. It will allow to restrict the number of files in 34 * a directory to 35 * {@link YarnConfiguration#NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY} which 36 * includes 36 sub-directories (named from 0 to 9 and a to z). Root directory is 37 * represented by an empty string. It internally maintains a vacant directory 38 * queue. As soon as the file count for the directory reaches its limit; new 39 * files will not be created in it until at least one file is deleted from it. 40 * New sub directories are not created unless a 41 * {@link LocalCacheDirectoryManager#getRelativePathForLocalization()} request 42 * is made and nonFullDirectories are empty. 43 * 44 * Note : this structure only returns relative localization path but doesn't 45 * create one on disk. 46 */ 47 public class LocalCacheDirectoryManager { 48 49 private final int perDirectoryFileLimit; 50 // total 36 = a to z plus 0 to 9 51 public static final int DIRECTORIES_PER_LEVEL = 36; 52 53 private Queue<Directory> nonFullDirectories; 54 private HashMap<String, Directory> knownDirectories; 55 private int totalSubDirectories; 56 LocalCacheDirectoryManager(Configuration conf)57 public LocalCacheDirectoryManager(Configuration conf) { 58 totalSubDirectories = 0; 59 Directory rootDir = new Directory(totalSubDirectories); 60 nonFullDirectories = new LinkedList<Directory>(); 61 knownDirectories = new HashMap<String, Directory>(); 62 knownDirectories.put("", rootDir); 63 nonFullDirectories.add(rootDir); 64 this.perDirectoryFileLimit = 65 conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, 66 YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY) - 36; 67 } 68 69 /** 70 * This method will return relative path from the first available vacant 71 * directory. 72 * 73 * @return {@link String} relative path for localization 74 */ getRelativePathForLocalization()75 public synchronized String getRelativePathForLocalization() { 76 if (nonFullDirectories.isEmpty()) { 77 totalSubDirectories++; 78 Directory newDir = new Directory(totalSubDirectories); 79 nonFullDirectories.add(newDir); 80 knownDirectories.put(newDir.getRelativePath(), newDir); 81 } 82 Directory subDir = nonFullDirectories.peek(); 83 if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) { 84 nonFullDirectories.remove(); 85 } 86 return subDir.getRelativePath(); 87 } 88 89 /** 90 * This method will reduce the file count for the directory represented by 91 * path. The root directory of this Local cache directory manager is 92 * represented by an empty string. 93 */ decrementFileCountForPath(String relPath)94 public synchronized void decrementFileCountForPath(String relPath) { 95 relPath = relPath == null ? "" : relPath.trim(); 96 Directory subDir = knownDirectories.get(relPath); 97 int oldCount = subDir.getCount(); 98 if (subDir.decrementAndGetCount() < perDirectoryFileLimit 99 && oldCount >= perDirectoryFileLimit) { 100 nonFullDirectories.add(subDir); 101 } 102 } 103 104 /** 105 * Increment the file count for a relative directory within the cache 106 * 107 * @param relPath the relative path 108 */ incrementFileCountForPath(String relPath)109 public synchronized void incrementFileCountForPath(String relPath) { 110 relPath = relPath == null ? "" : relPath.trim(); 111 Directory subDir = knownDirectories.get(relPath); 112 if (subDir == null) { 113 int dirnum = Directory.getDirectoryNumber(relPath); 114 totalSubDirectories = Math.max(dirnum, totalSubDirectories); 115 subDir = new Directory(dirnum); 116 nonFullDirectories.add(subDir); 117 knownDirectories.put(subDir.getRelativePath(), subDir); 118 } 119 if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) { 120 nonFullDirectories.remove(subDir); 121 } 122 } 123 124 /** 125 * Given a path to a directory within a local cache tree return the 126 * root of the cache directory. 127 * 128 * @param path the directory within a cache directory 129 * @return the local cache directory root or null if not found 130 */ getCacheDirectoryRoot(Path path)131 public static Path getCacheDirectoryRoot(Path path) { 132 while (path != null) { 133 String name = path.getName(); 134 if (name.length() != 1) { 135 return path; 136 } 137 int dirnum = DIRECTORIES_PER_LEVEL; 138 try { 139 dirnum = Integer.parseInt(name, DIRECTORIES_PER_LEVEL); 140 } catch (NumberFormatException e) { 141 } 142 if (dirnum >= DIRECTORIES_PER_LEVEL) { 143 return path; 144 } 145 path = path.getParent(); 146 } 147 return path; 148 } 149 150 @VisibleForTesting getDirectory(String relPath)151 synchronized Directory getDirectory(String relPath) { 152 return knownDirectories.get(relPath); 153 } 154 155 /* 156 * It limits the number of files and sub directories in the directory to the 157 * limit LocalCacheDirectoryManager#perDirectoryFileLimit. 158 */ 159 static class Directory { 160 161 private final String relativePath; 162 private int fileCount; 163 getRelativePath(int directoryNo)164 static String getRelativePath(int directoryNo) { 165 String relativePath = ""; 166 if (directoryNo > 0) { 167 String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL); 168 StringBuffer sb = new StringBuffer(); 169 if (tPath.length() == 1) { 170 sb.append(tPath.charAt(0)); 171 } else { 172 // this is done to make sure we also reuse 0th sub directory 173 sb.append(Integer.toString( 174 Integer.parseInt(tPath.substring(0, 1), DIRECTORIES_PER_LEVEL) - 1, 175 DIRECTORIES_PER_LEVEL)); 176 } 177 for (int i = 1; i < tPath.length(); i++) { 178 sb.append(Path.SEPARATOR).append(tPath.charAt(i)); 179 } 180 relativePath = sb.toString(); 181 } 182 return relativePath; 183 } 184 getDirectoryNumber(String relativePath)185 static int getDirectoryNumber(String relativePath) { 186 String numStr = relativePath.replace("/", ""); 187 if (relativePath.isEmpty()) { 188 return 0; 189 } 190 if (numStr.length() > 1) { 191 // undo step from getRelativePath() to reuse 0th sub directory 192 String firstChar = Integer.toString( 193 Integer.parseInt(numStr.substring(0, 1), 194 DIRECTORIES_PER_LEVEL) + 1, DIRECTORIES_PER_LEVEL); 195 numStr = firstChar + numStr.substring(1); 196 } 197 return Integer.parseInt(numStr, DIRECTORIES_PER_LEVEL) + 1; 198 } 199 Directory(int directoryNo)200 public Directory(int directoryNo) { 201 fileCount = 0; 202 relativePath = getRelativePath(directoryNo); 203 } 204 incrementAndGetCount()205 public int incrementAndGetCount() { 206 return ++fileCount; 207 } 208 decrementAndGetCount()209 public int decrementAndGetCount() { 210 return --fileCount; 211 } 212 getRelativePath()213 public String getRelativePath() { 214 return relativePath; 215 } 216 getCount()217 public int getCount() { 218 return fileCount; 219 } 220 } 221 }