1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.MalformedURLException;
24 import java.net.URISyntaxException;
25 import java.net.URL;
26 import java.net.URLClassLoader;
27 import java.security.AccessController;
28 import java.security.PrivilegedAction;
29 import java.util.ArrayList;
30 import java.util.HashMap;
31 import java.util.LinkedHashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.atomic.AtomicLong;
42 
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.fs.FileContext;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.FileUtil;
48 import org.apache.hadoop.fs.LocalDirAllocator;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.mapreduce.MRConfig;
51 import org.apache.hadoop.mapreduce.MRJobConfig;
52 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
53 import org.apache.hadoop.mapreduce.v2.util.MRApps;
54 import org.apache.hadoop.security.UserGroupInformation;
55 import org.apache.hadoop.util.StringUtils;
56 import org.apache.hadoop.yarn.api.records.LocalResource;
57 import org.apache.hadoop.yarn.api.records.LocalResourceType;
58 import org.apache.hadoop.yarn.util.ConverterUtils;
59 import org.apache.hadoop.yarn.util.FSDownload;
60 
61 import com.google.common.collect.Maps;
62 import com.google.common.util.concurrent.ThreadFactoryBuilder;
63 
64 /**
65  * A helper class for managing the distributed cache for {@link LocalJobRunner}.
66  */
67 @SuppressWarnings("deprecation")
68 class LocalDistributedCacheManager {
69   public static final Log LOG =
70     LogFactory.getLog(LocalDistributedCacheManager.class);
71 
72   private List<String> localArchives = new ArrayList<String>();
73   private List<String> localFiles = new ArrayList<String>();
74   private List<String> localClasspaths = new ArrayList<String>();
75 
76   private List<File> symlinksCreated = new ArrayList<File>();
77 
78   private boolean setupCalled = false;
79 
80   /**
81    * Set up the distributed cache by localizing the resources, and updating
82    * the configuration with references to the localized resources.
83    * @param conf
84    * @throws IOException
85    */
setup(JobConf conf)86   public void setup(JobConf conf) throws IOException {
87     File workDir = new File(System.getProperty("user.dir"));
88 
89     // Generate YARN local resources objects corresponding to the distributed
90     // cache configuration
91     Map<String, LocalResource> localResources =
92       new LinkedHashMap<String, LocalResource>();
93     MRApps.setupDistributedCache(conf, localResources);
94     // Generating unique numbers for FSDownload.
95     AtomicLong uniqueNumberGenerator =
96         new AtomicLong(System.currentTimeMillis());
97 
98     // Find which resources are to be put on the local classpath
99     Map<String, Path> classpaths = new HashMap<String, Path>();
100     Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
101     if (archiveClassPaths != null) {
102       for (Path p : archiveClassPaths) {
103         classpaths.put(p.toUri().getPath().toString(), p);
104       }
105     }
106     Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
107     if (fileClassPaths != null) {
108       for (Path p : fileClassPaths) {
109         classpaths.put(p.toUri().getPath().toString(), p);
110       }
111     }
112 
113     // Localize the resources
114     LocalDirAllocator localDirAllocator =
115       new LocalDirAllocator(MRConfig.LOCAL_DIR);
116     FileContext localFSFileContext = FileContext.getLocalFSFileContext();
117     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
118 
119     ExecutorService exec = null;
120     try {
121       ThreadFactory tf = new ThreadFactoryBuilder()
122       .setNameFormat("LocalDistributedCacheManager Downloader #%d")
123       .build();
124       exec = Executors.newCachedThreadPool(tf);
125       Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
126       Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
127       for (LocalResource resource : localResources.values()) {
128         Callable<Path> download =
129             new FSDownload(localFSFileContext, ugi, conf, new Path(destPath,
130                 Long.toString(uniqueNumberGenerator.incrementAndGet())),
131                 resource);
132         Future<Path> future = exec.submit(download);
133         resourcesToPaths.put(resource, future);
134       }
135       for (Entry<String, LocalResource> entry : localResources.entrySet()) {
136         LocalResource resource = entry.getValue();
137         Path path;
138         try {
139           path = resourcesToPaths.get(resource).get();
140         } catch (InterruptedException e) {
141           throw new IOException(e);
142         } catch (ExecutionException e) {
143           throw new IOException(e);
144         }
145         String pathString = path.toUri().toString();
146         String link = entry.getKey();
147         String target = new File(path.toUri()).getPath();
148         symlink(workDir, target, link);
149 
150         if (resource.getType() == LocalResourceType.ARCHIVE) {
151           localArchives.add(pathString);
152         } else if (resource.getType() == LocalResourceType.FILE) {
153           localFiles.add(pathString);
154         } else if (resource.getType() == LocalResourceType.PATTERN) {
155           //PATTERN is not currently used in local mode
156           throw new IllegalArgumentException("Resource type PATTERN is not " +
157           		"implemented yet. " + resource.getResource());
158         }
159         Path resourcePath;
160         try {
161           resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
162         } catch (URISyntaxException e) {
163           throw new IOException(e);
164         }
165         LOG.info(String.format("Localized %s as %s", resourcePath, path));
166         String cp = resourcePath.toUri().getPath();
167         if (classpaths.keySet().contains(cp)) {
168           localClasspaths.add(path.toUri().getPath().toString());
169         }
170       }
171     } finally {
172       if (exec != null) {
173         exec.shutdown();
174       }
175     }
176     // Update the configuration object with localized data.
177     if (!localArchives.isEmpty()) {
178       conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
179           .arrayToString(localArchives.toArray(new String[localArchives
180               .size()])));
181     }
182     if (!localFiles.isEmpty()) {
183       conf.set(MRJobConfig.CACHE_LOCALFILES, StringUtils
184           .arrayToString(localFiles.toArray(new String[localArchives
185               .size()])));
186     }
187     setupCalled = true;
188   }
189 
190   /**
191    * Utility method for creating a symlink and warning on errors.
192    *
193    * If link is null, does nothing.
194    */
symlink(File workDir, String target, String link)195   private void symlink(File workDir, String target, String link)
196       throws IOException {
197     if (link != null) {
198       link = workDir.toString() + Path.SEPARATOR + link;
199       File flink = new File(link);
200       if (!flink.exists()) {
201         LOG.info(String.format("Creating symlink: %s <- %s", target, link));
202         if (0 != FileUtil.symLink(target, link)) {
203           LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
204               link));
205         } else {
206           symlinksCreated.add(new File(link));
207         }
208       }
209     }
210   }
211 
212   /**
213    * Are the resources that should be added to the classpath?
214    * Should be called after setup().
215    *
216    */
hasLocalClasspaths()217   public boolean hasLocalClasspaths() {
218     if (!setupCalled) {
219       throw new IllegalStateException(
220           "hasLocalClasspaths() should be called after setup()");
221     }
222     return !localClasspaths.isEmpty();
223   }
224 
225   /**
226    * Creates a class loader that includes the designated
227    * files and archives.
228    */
makeClassLoader(final ClassLoader parent)229   public ClassLoader makeClassLoader(final ClassLoader parent)
230       throws MalformedURLException {
231     final URL[] urls = new URL[localClasspaths.size()];
232     for (int i = 0; i < localClasspaths.size(); ++i) {
233       urls[i] = new File(localClasspaths.get(i)).toURI().toURL();
234       LOG.info(urls[i]);
235     }
236     return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
237       @Override
238       public ClassLoader run() {
239         return new URLClassLoader(urls, parent);
240       }
241     });
242   }
243 
244   public void close() throws IOException {
245     for (File symlink : symlinksCreated) {
246       if (!symlink.delete()) {
247         LOG.warn("Failed to delete symlink created by the local job runner: " +
248             symlink);
249       }
250     }
251     FileContext localFSFileContext = FileContext.getLocalFSFileContext();
252     for (String archive : localArchives) {
253       localFSFileContext.delete(new Path(archive), true);
254     }
255     for (String file : localFiles) {
256       localFSFileContext.delete(new Path(file), true);
257     }
258   }
259 }
260