1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import java.io.File; 22 import java.io.IOException; 23 import java.net.MalformedURLException; 24 import java.net.URISyntaxException; 25 import java.net.URL; 26 import java.net.URLClassLoader; 27 import java.security.AccessController; 28 import java.security.PrivilegedAction; 29 import java.util.ArrayList; 30 import java.util.HashMap; 31 import java.util.LinkedHashMap; 32 import java.util.List; 33 import java.util.Map; 34 import java.util.Map.Entry; 35 import java.util.concurrent.Callable; 36 import java.util.concurrent.ExecutionException; 37 import java.util.concurrent.ExecutorService; 38 import java.util.concurrent.Executors; 39 import java.util.concurrent.Future; 40 import java.util.concurrent.ThreadFactory; 41 import java.util.concurrent.atomic.AtomicLong; 42 43 import org.apache.commons.logging.Log; 44 import org.apache.commons.logging.LogFactory; 45 import org.apache.hadoop.fs.FileContext; 46 import org.apache.hadoop.fs.FileSystem; 47 import org.apache.hadoop.fs.FileUtil; 48 import org.apache.hadoop.fs.LocalDirAllocator; 49 import org.apache.hadoop.fs.Path; 50 import org.apache.hadoop.mapreduce.MRConfig; 51 import org.apache.hadoop.mapreduce.MRJobConfig; 52 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 53 import org.apache.hadoop.mapreduce.v2.util.MRApps; 54 import org.apache.hadoop.security.UserGroupInformation; 55 import org.apache.hadoop.util.StringUtils; 56 import org.apache.hadoop.yarn.api.records.LocalResource; 57 import org.apache.hadoop.yarn.api.records.LocalResourceType; 58 import org.apache.hadoop.yarn.util.ConverterUtils; 59 import org.apache.hadoop.yarn.util.FSDownload; 60 61 import com.google.common.collect.Maps; 62 import com.google.common.util.concurrent.ThreadFactoryBuilder; 63 64 /** 65 * A helper class for managing the distributed cache for {@link LocalJobRunner}. 66 */ 67 @SuppressWarnings("deprecation") 68 class LocalDistributedCacheManager { 69 public static final Log LOG = 70 LogFactory.getLog(LocalDistributedCacheManager.class); 71 72 private List<String> localArchives = new ArrayList<String>(); 73 private List<String> localFiles = new ArrayList<String>(); 74 private List<String> localClasspaths = new ArrayList<String>(); 75 76 private List<File> symlinksCreated = new ArrayList<File>(); 77 78 private boolean setupCalled = false; 79 80 /** 81 * Set up the distributed cache by localizing the resources, and updating 82 * the configuration with references to the localized resources. 83 * @param conf 84 * @throws IOException 85 */ setup(JobConf conf)86 public void setup(JobConf conf) throws IOException { 87 File workDir = new File(System.getProperty("user.dir")); 88 89 // Generate YARN local resources objects corresponding to the distributed 90 // cache configuration 91 Map<String, LocalResource> localResources = 92 new LinkedHashMap<String, LocalResource>(); 93 MRApps.setupDistributedCache(conf, localResources); 94 // Generating unique numbers for FSDownload. 95 AtomicLong uniqueNumberGenerator = 96 new AtomicLong(System.currentTimeMillis()); 97 98 // Find which resources are to be put on the local classpath 99 Map<String, Path> classpaths = new HashMap<String, Path>(); 100 Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf); 101 if (archiveClassPaths != null) { 102 for (Path p : archiveClassPaths) { 103 classpaths.put(p.toUri().getPath().toString(), p); 104 } 105 } 106 Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf); 107 if (fileClassPaths != null) { 108 for (Path p : fileClassPaths) { 109 classpaths.put(p.toUri().getPath().toString(), p); 110 } 111 } 112 113 // Localize the resources 114 LocalDirAllocator localDirAllocator = 115 new LocalDirAllocator(MRConfig.LOCAL_DIR); 116 FileContext localFSFileContext = FileContext.getLocalFSFileContext(); 117 UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); 118 119 ExecutorService exec = null; 120 try { 121 ThreadFactory tf = new ThreadFactoryBuilder() 122 .setNameFormat("LocalDistributedCacheManager Downloader #%d") 123 .build(); 124 exec = Executors.newCachedThreadPool(tf); 125 Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); 126 Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap(); 127 for (LocalResource resource : localResources.values()) { 128 Callable<Path> download = 129 new FSDownload(localFSFileContext, ugi, conf, new Path(destPath, 130 Long.toString(uniqueNumberGenerator.incrementAndGet())), 131 resource); 132 Future<Path> future = exec.submit(download); 133 resourcesToPaths.put(resource, future); 134 } 135 for (Entry<String, LocalResource> entry : localResources.entrySet()) { 136 LocalResource resource = entry.getValue(); 137 Path path; 138 try { 139 path = resourcesToPaths.get(resource).get(); 140 } catch (InterruptedException e) { 141 throw new IOException(e); 142 } catch (ExecutionException e) { 143 throw new IOException(e); 144 } 145 String pathString = path.toUri().toString(); 146 String link = entry.getKey(); 147 String target = new File(path.toUri()).getPath(); 148 symlink(workDir, target, link); 149 150 if (resource.getType() == LocalResourceType.ARCHIVE) { 151 localArchives.add(pathString); 152 } else if (resource.getType() == LocalResourceType.FILE) { 153 localFiles.add(pathString); 154 } else if (resource.getType() == LocalResourceType.PATTERN) { 155 //PATTERN is not currently used in local mode 156 throw new IllegalArgumentException("Resource type PATTERN is not " + 157 "implemented yet. " + resource.getResource()); 158 } 159 Path resourcePath; 160 try { 161 resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource()); 162 } catch (URISyntaxException e) { 163 throw new IOException(e); 164 } 165 LOG.info(String.format("Localized %s as %s", resourcePath, path)); 166 String cp = resourcePath.toUri().getPath(); 167 if (classpaths.keySet().contains(cp)) { 168 localClasspaths.add(path.toUri().getPath().toString()); 169 } 170 } 171 } finally { 172 if (exec != null) { 173 exec.shutdown(); 174 } 175 } 176 // Update the configuration object with localized data. 177 if (!localArchives.isEmpty()) { 178 conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils 179 .arrayToString(localArchives.toArray(new String[localArchives 180 .size()]))); 181 } 182 if (!localFiles.isEmpty()) { 183 conf.set(MRJobConfig.CACHE_LOCALFILES, StringUtils 184 .arrayToString(localFiles.toArray(new String[localArchives 185 .size()]))); 186 } 187 setupCalled = true; 188 } 189 190 /** 191 * Utility method for creating a symlink and warning on errors. 192 * 193 * If link is null, does nothing. 194 */ symlink(File workDir, String target, String link)195 private void symlink(File workDir, String target, String link) 196 throws IOException { 197 if (link != null) { 198 link = workDir.toString() + Path.SEPARATOR + link; 199 File flink = new File(link); 200 if (!flink.exists()) { 201 LOG.info(String.format("Creating symlink: %s <- %s", target, link)); 202 if (0 != FileUtil.symLink(target, link)) { 203 LOG.warn(String.format("Failed to create symlink: %s <- %s", target, 204 link)); 205 } else { 206 symlinksCreated.add(new File(link)); 207 } 208 } 209 } 210 } 211 212 /** 213 * Are the resources that should be added to the classpath? 214 * Should be called after setup(). 215 * 216 */ hasLocalClasspaths()217 public boolean hasLocalClasspaths() { 218 if (!setupCalled) { 219 throw new IllegalStateException( 220 "hasLocalClasspaths() should be called after setup()"); 221 } 222 return !localClasspaths.isEmpty(); 223 } 224 225 /** 226 * Creates a class loader that includes the designated 227 * files and archives. 228 */ makeClassLoader(final ClassLoader parent)229 public ClassLoader makeClassLoader(final ClassLoader parent) 230 throws MalformedURLException { 231 final URL[] urls = new URL[localClasspaths.size()]; 232 for (int i = 0; i < localClasspaths.size(); ++i) { 233 urls[i] = new File(localClasspaths.get(i)).toURI().toURL(); 234 LOG.info(urls[i]); 235 } 236 return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { 237 @Override 238 public ClassLoader run() { 239 return new URLClassLoader(urls, parent); 240 } 241 }); 242 } 243 244 public void close() throws IOException { 245 for (File symlink : symlinksCreated) { 246 if (!symlink.delete()) { 247 LOG.warn("Failed to delete symlink created by the local job runner: " + 248 symlink); 249 } 250 } 251 FileContext localFSFileContext = FileContext.getLocalFSFileContext(); 252 for (String archive : localArchives) { 253 localFSFileContext.delete(new Path(archive), true); 254 } 255 for (String file : localFiles) { 256 localFSFileContext.delete(new Path(file), true); 257 } 258 } 259 } 260