1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2.util;
20 
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.MalformedURLException;
24 import java.net.URI;
25 import java.net.URISyntaxException;
26 import java.security.AccessController;
27 import java.security.PrivilegedActionException;
28 import java.security.PrivilegedExceptionAction;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 
35 import com.google.common.annotations.VisibleForTesting;
36 
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.classification.InterfaceAudience.Private;
40 import org.apache.hadoop.classification.InterfaceAudience.Public;
41 import org.apache.hadoop.classification.InterfaceStability.Unstable;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.mapred.InvalidJobConfException;
46 import org.apache.hadoop.mapred.JobConf;
47 import org.apache.hadoop.mapred.Task;
48 import org.apache.hadoop.mapred.TaskLog;
49 import org.apache.hadoop.mapreduce.JobID;
50 import org.apache.hadoop.mapreduce.MRConfig;
51 import org.apache.hadoop.mapreduce.MRJobConfig;
52 import org.apache.hadoop.mapreduce.TaskAttemptID;
53 import org.apache.hadoop.mapreduce.TaskID;
54 import org.apache.hadoop.mapreduce.TypeConverter;
55 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
56 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
57 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
58 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
59 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
60 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
61 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
62 import org.apache.hadoop.util.ApplicationClassLoader;
63 import org.apache.hadoop.util.StringUtils;
64 import org.apache.hadoop.yarn.ContainerLogAppender;
65 import org.apache.hadoop.yarn.ContainerRollingLogAppender;
66 import org.apache.hadoop.yarn.api.ApplicationConstants;
67 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
68 import org.apache.hadoop.yarn.api.records.LocalResource;
69 import org.apache.hadoop.yarn.api.records.LocalResourceType;
70 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
71 import org.apache.hadoop.yarn.conf.YarnConfiguration;
72 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
73 import org.apache.hadoop.yarn.util.Apps;
74 import org.apache.hadoop.yarn.util.ConverterUtils;
75 
76 /**
77  * Helper class for MR applications
78  */
79 @Private
80 @Unstable
81 public class MRApps extends Apps {
82   public static final Log LOG = LogFactory.getLog(MRApps.class);
83 
toString(JobId jid)84   public static String toString(JobId jid) {
85     return jid.toString();
86   }
87 
toJobID(String jid)88   public static JobId toJobID(String jid) {
89     return TypeConverter.toYarn(JobID.forName(jid));
90   }
91 
toString(TaskId tid)92   public static String toString(TaskId tid) {
93     return tid.toString();
94   }
95 
toTaskID(String tid)96   public static TaskId toTaskID(String tid) {
97     return TypeConverter.toYarn(TaskID.forName(tid));
98   }
99 
toString(TaskAttemptId taid)100   public static String toString(TaskAttemptId taid) {
101     return taid.toString();
102   }
103 
toTaskAttemptID(String taid)104   public static TaskAttemptId toTaskAttemptID(String taid) {
105     return TypeConverter.toYarn(TaskAttemptID.forName(taid));
106   }
107 
taskSymbol(TaskType type)108   public static String taskSymbol(TaskType type) {
109     switch (type) {
110       case MAP:           return "m";
111       case REDUCE:        return "r";
112     }
113     throw new YarnRuntimeException("Unknown task type: "+ type.toString());
114   }
115 
116   public static enum TaskAttemptStateUI {
117     NEW(
118         new TaskAttemptState[] { TaskAttemptState.NEW,
119             TaskAttemptState.STARTING }),
120     RUNNING(
121         new TaskAttemptState[] { TaskAttemptState.RUNNING,
122             TaskAttemptState.COMMIT_PENDING }),
123     SUCCESSFUL(new TaskAttemptState[] { TaskAttemptState.SUCCEEDED}),
124     FAILED(new TaskAttemptState[] { TaskAttemptState.FAILED}),
125     KILLED(new TaskAttemptState[] { TaskAttemptState.KILLED});
126 
127     private final List<TaskAttemptState> correspondingStates;
128 
TaskAttemptStateUI(TaskAttemptState[] correspondingStates)129     private TaskAttemptStateUI(TaskAttemptState[] correspondingStates) {
130       this.correspondingStates = Arrays.asList(correspondingStates);
131     }
132 
correspondsTo(TaskAttemptState state)133     public boolean correspondsTo(TaskAttemptState state) {
134       return this.correspondingStates.contains(state);
135     }
136   }
137 
138   public static enum TaskStateUI {
139     RUNNING(
140         new TaskState[]{TaskState.RUNNING}),
141     PENDING(new TaskState[]{TaskState.SCHEDULED}),
142     COMPLETED(new TaskState[]{TaskState.SUCCEEDED, TaskState.FAILED, TaskState.KILLED});
143 
144     private final List<TaskState> correspondingStates;
145 
TaskStateUI(TaskState[] correspondingStates)146     private TaskStateUI(TaskState[] correspondingStates) {
147       this.correspondingStates = Arrays.asList(correspondingStates);
148     }
149 
correspondsTo(TaskState state)150     public boolean correspondsTo(TaskState state) {
151       return this.correspondingStates.contains(state);
152     }
153   }
154 
taskType(String symbol)155   public static TaskType taskType(String symbol) {
156     // JDK 7 supports switch on strings
157     if (symbol.equals("m")) return TaskType.MAP;
158     if (symbol.equals("r")) return TaskType.REDUCE;
159     throw new YarnRuntimeException("Unknown task symbol: "+ symbol);
160   }
161 
taskAttemptState(String attemptStateStr)162   public static TaskAttemptStateUI taskAttemptState(String attemptStateStr) {
163     return TaskAttemptStateUI.valueOf(attemptStateStr);
164   }
165 
taskState(String taskStateStr)166   public static TaskStateUI taskState(String taskStateStr) {
167     return TaskStateUI.valueOf(taskStateStr);
168   }
169 
170   // gets the base name of the MapReduce framework or null if no
171   // framework was configured
getMRFrameworkName(Configuration conf)172   private static String getMRFrameworkName(Configuration conf) {
173     String frameworkName = null;
174     String framework =
175         conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, "");
176     if (!framework.isEmpty()) {
177       URI uri;
178       try {
179         uri = new URI(framework);
180       } catch (URISyntaxException e) {
181         throw new IllegalArgumentException("Unable to parse '" + framework
182             + "' as a URI, check the setting for "
183             + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e);
184       }
185 
186       frameworkName = uri.getFragment();
187       if (frameworkName == null) {
188         frameworkName = new Path(uri).getName();
189       }
190     }
191     return frameworkName;
192   }
193 
setMRFrameworkClasspath( Map<String, String> environment, Configuration conf)194   private static void setMRFrameworkClasspath(
195       Map<String, String> environment, Configuration conf) throws IOException {
196     // Propagate the system classpath when using the mini cluster
197     if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
198       MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(),
199           System.getProperty("java.class.path"), conf);
200     }
201     boolean crossPlatform =
202         conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
203           MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
204 
205     // if the framework is specified then only use the MR classpath
206     String frameworkName = getMRFrameworkName(conf);
207     if (frameworkName == null) {
208       // Add standard Hadoop classes
209       for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
210           crossPlatform
211               ? YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH
212               : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
213         MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(),
214           c.trim(), conf);
215       }
216     }
217 
218     boolean foundFrameworkInClasspath = (frameworkName == null);
219     for (String c : conf.getStrings(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
220         crossPlatform ?
221             StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH)
222             : StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) {
223       MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(),
224         c.trim(), conf);
225       if (!foundFrameworkInClasspath) {
226         foundFrameworkInClasspath = c.contains(frameworkName);
227       }
228     }
229 
230     if (!foundFrameworkInClasspath) {
231       throw new IllegalArgumentException(
232           "Could not locate MapReduce framework name '" + frameworkName
233           + "' in " + MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH);
234     }
235     // TODO: Remove duplicates.
236   }
237 
238   @SuppressWarnings("deprecation")
setClasspath(Map<String, String> environment, Configuration conf)239   public static void setClasspath(Map<String, String> environment,
240       Configuration conf) throws IOException {
241     boolean userClassesTakesPrecedence =
242       conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
243 
244     String classpathEnvVar =
245       conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
246         ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
247 
248     String hadoopClasspathEnvVar = Environment.HADOOP_CLASSPATH.name();
249 
250     MRApps.addToEnvironment(environment,
251       classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);
252 
253     MRApps.addToEnvironment(environment,
254         hadoopClasspathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD),
255         conf);
256 
257     if (!userClassesTakesPrecedence) {
258       MRApps.setMRFrameworkClasspath(environment, conf);
259     }
260 
261     addClasspathToEnv(environment, classpathEnvVar, conf);
262     addClasspathToEnv(environment, hadoopClasspathEnvVar, conf);
263 
264     if (userClassesTakesPrecedence) {
265       MRApps.setMRFrameworkClasspath(environment, conf);
266     }
267   }
268 
269   @SuppressWarnings("deprecation")
addClasspathToEnv(Map<String, String> environment, String classpathEnvVar, Configuration conf)270   public static void addClasspathToEnv(Map<String, String> environment,
271       String classpathEnvVar, Configuration conf) throws IOException {
272     MRApps.addToEnvironment(
273         environment,
274         classpathEnvVar,
275         MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
276     MRApps.addToEnvironment(
277         environment,
278         classpathEnvVar,
279         MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR,
280         conf);
281 
282     MRApps.addToEnvironment(
283         environment,
284         classpathEnvVar,
285         MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*",
286         conf);
287 
288     MRApps.addToEnvironment(
289         environment,
290         classpathEnvVar,
291         crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*",
292         conf);
293 
294     // a * in the classpath will only find a .jar, so we need to filter out
295     // all .jars and add everything else
296     addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
297         DistributedCache.getCacheFiles(conf),
298         conf,
299         environment, classpathEnvVar);
300     addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
301         DistributedCache.getCacheArchives(conf),
302         conf,
303         environment, classpathEnvVar);
304   }
305 
306   /**
307    * Add the paths to the classpath if they are not jars
308    * @param paths the paths to add to the classpath
309    * @param withLinks the corresponding paths that may have a link name in them
310    * @param conf used to resolve the paths
311    * @param environment the environment to update CLASSPATH in
312    * @throws IOException if there is an error resolving any of the paths.
313    */
addToClasspathIfNotJar(Path[] paths, URI[] withLinks, Configuration conf, Map<String, String> environment, String classpathEnvVar)314   private static void addToClasspathIfNotJar(Path[] paths,
315       URI[] withLinks, Configuration conf,
316       Map<String, String> environment,
317       String classpathEnvVar) throws IOException {
318     if (paths != null) {
319       HashMap<Path, String> linkLookup = new HashMap<Path, String>();
320       if (withLinks != null) {
321         for (URI u: withLinks) {
322           Path p = new Path(u);
323           FileSystem remoteFS = p.getFileSystem(conf);
324           p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
325               remoteFS.getWorkingDirectory()));
326           String name = (null == u.getFragment())
327               ? p.getName() : u.getFragment();
328           if (!StringUtils.toLowerCase(name).endsWith(".jar")) {
329             linkLookup.put(p, name);
330           }
331         }
332       }
333 
334       for (Path p : paths) {
335         FileSystem remoteFS = p.getFileSystem(conf);
336         p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
337             remoteFS.getWorkingDirectory()));
338         String name = linkLookup.get(p);
339         if (name == null) {
340           name = p.getName();
341         }
342         if(!StringUtils.toLowerCase(name).endsWith(".jar")) {
343           MRApps.addToEnvironment(
344               environment,
345               classpathEnvVar,
346               crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + name, conf);
347         }
348       }
349     }
350   }
351 
352   /**
353    * Creates and sets a {@link ApplicationClassLoader} on the given
354    * configuration and as the thread context classloader, if
355    * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
356    * the APP_CLASSPATH environment variable is set.
357    * @param conf
358    * @throws IOException
359    */
setJobClassLoader(Configuration conf)360   public static void setJobClassLoader(Configuration conf)
361       throws IOException {
362     setClassLoader(createJobClassLoader(conf), conf);
363   }
364 
365   /**
366    * Creates a {@link ApplicationClassLoader} if
367    * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
368    * the APP_CLASSPATH environment variable is set.
369    * @param conf
370    * @return the created job classloader, or null if the job classloader is not
371    * enabled or the APP_CLASSPATH environment variable is not set
372    * @throws IOException
373    */
createJobClassLoader(Configuration conf)374   public static ClassLoader createJobClassLoader(Configuration conf)
375       throws IOException {
376     ClassLoader jobClassLoader = null;
377     if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
378       String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
379       if (appClasspath == null) {
380         LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
381       } else {
382         LOG.info("Creating job classloader");
383         if (LOG.isDebugEnabled()) {
384           LOG.debug("APP_CLASSPATH=" + appClasspath);
385         }
386         String[] systemClasses = getSystemClasses(conf);
387         jobClassLoader = createJobClassLoader(appClasspath,
388             systemClasses);
389       }
390     }
391     return jobClassLoader;
392   }
393 
394   /**
395    * Sets the provided classloader on the given configuration and as the thread
396    * context classloader if the classloader is not null.
397    * @param classLoader
398    * @param conf
399    */
setClassLoader(ClassLoader classLoader, Configuration conf)400   public static void setClassLoader(ClassLoader classLoader,
401       Configuration conf) {
402     if (classLoader != null) {
403       LOG.info("Setting classloader " + classLoader.getClass().getName() +
404           " on the configuration and as the thread context classloader");
405       conf.setClassLoader(classLoader);
406       Thread.currentThread().setContextClassLoader(classLoader);
407     }
408   }
409 
410   @VisibleForTesting
getSystemClasses(Configuration conf)411   static String[] getSystemClasses(Configuration conf) {
412     return conf.getTrimmedStrings(
413         MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
414   }
415 
createJobClassLoader(final String appClasspath, final String[] systemClasses)416   private static ClassLoader createJobClassLoader(final String appClasspath,
417       final String[] systemClasses) throws IOException {
418     try {
419       return AccessController.doPrivileged(
420         new PrivilegedExceptionAction<ClassLoader>() {
421           @Override
422           public ClassLoader run() throws MalformedURLException {
423             return new ApplicationClassLoader(appClasspath,
424                 MRApps.class.getClassLoader(), Arrays.asList(systemClasses));
425           }
426       });
427     } catch (PrivilegedActionException e) {
428       Throwable t = e.getCause();
429       if (t instanceof MalformedURLException) {
430         throw (MalformedURLException) t;
431       }
432       throw new IOException(e);
433     }
434   }
435 
436   private static final String STAGING_CONSTANT = ".staging";
437   public static Path getStagingAreaDir(Configuration conf, String user) {
438     return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,
439         MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
440         + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
441   }
442 
443   public static String getJobFile(Configuration conf, String user,
444       org.apache.hadoop.mapreduce.JobID jobId) {
445     Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user),
446         jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
447     return jobFile.toString();
448   }
449 
450   public static Path getEndJobCommitSuccessFile(Configuration conf, String user,
451       JobId jobId) {
452     Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
453         jobId.toString() + Path.SEPARATOR + "COMMIT_SUCCESS");
454     return endCommitFile;
455   }
456 
457   public static Path getEndJobCommitFailureFile(Configuration conf, String user,
458       JobId jobId) {
459     Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
460         jobId.toString() + Path.SEPARATOR + "COMMIT_FAIL");
461     return endCommitFile;
462   }
463 
464   public static Path getStartJobCommitFile(Configuration conf, String user,
465       JobId jobId) {
466     Path startCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
467         jobId.toString() + Path.SEPARATOR + "COMMIT_STARTED");
468     return startCommitFile;
469   }
470 
471   @SuppressWarnings("deprecation")
472   public static void setupDistributedCache(
473       Configuration conf,
474       Map<String, LocalResource> localResources)
475   throws IOException {
476 
477     // Cache archives
478     parseDistributedCacheArtifacts(conf, localResources,
479         LocalResourceType.ARCHIVE,
480         DistributedCache.getCacheArchives(conf),
481         DistributedCache.getArchiveTimestamps(conf),
482         getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
483         DistributedCache.getArchiveVisibilities(conf));
484 
485     // Cache files
486     parseDistributedCacheArtifacts(conf,
487         localResources,
488         LocalResourceType.FILE,
489         DistributedCache.getCacheFiles(conf),
490         DistributedCache.getFileTimestamps(conf),
491         getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
492         DistributedCache.getFileVisibilities(conf));
493   }
494 
495   /**
496    * Set up the DistributedCache related configs to make
497    * {@link DistributedCache#getLocalCacheFiles(Configuration)}
498    * and
499    * {@link DistributedCache#getLocalCacheArchives(Configuration)}
500    * working.
501    * @param conf
502    * @throws java.io.IOException
503    */
504   @SuppressWarnings("deprecation")
505   public static void setupDistributedCacheLocal(Configuration conf)
506       throws IOException {
507 
508     String localWorkDir = System.getenv("PWD");
509     //        ^ ^ all symlinks are created in the current work-dir
510 
511     // Update the configuration object with localized archives.
512     URI[] cacheArchives = DistributedCache.getCacheArchives(conf);
513     if (cacheArchives != null) {
514       List<String> localArchives = new ArrayList<String>();
515       for (int i = 0; i < cacheArchives.length; ++i) {
516         URI u = cacheArchives[i];
517         Path p = new Path(u);
518         Path name =
519             new Path((null == u.getFragment()) ? p.getName()
520                 : u.getFragment());
521         String linkName = name.toUri().getPath();
522         localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
523       }
524       if (!localArchives.isEmpty()) {
525         conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
526             .arrayToString(localArchives.toArray(new String[localArchives
527                 .size()])));
528       }
529     }
530 
531     // Update the configuration object with localized files.
532     URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
533     if (cacheFiles != null) {
534       List<String> localFiles = new ArrayList<String>();
535       for (int i = 0; i < cacheFiles.length; ++i) {
536         URI u = cacheFiles[i];
537         Path p = new Path(u);
538         Path name =
539             new Path((null == u.getFragment()) ? p.getName()
540                 : u.getFragment());
541         String linkName = name.toUri().getPath();
542         localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
543       }
544       if (!localFiles.isEmpty()) {
545         conf.set(MRJobConfig.CACHE_LOCALFILES,
546             StringUtils.arrayToString(localFiles
547                 .toArray(new String[localFiles.size()])));
548       }
549     }
550   }
551 
552   private static String getResourceDescription(LocalResourceType type) {
553     if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) {
554       return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
555     }
556     return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
557   }
558 
559   private static String toString(org.apache.hadoop.yarn.api.records.URL url) {
560     StringBuffer b = new StringBuffer();
561     b.append(url.getScheme()).append("://").append(url.getHost());
562     if(url.getPort() >= 0) {
563       b.append(":").append(url.getPort());
564     }
565     b.append(url.getFile());
566     return b.toString();
567   }
568 
569   // TODO - Move this to MR!
570   // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
571   // long[], boolean[], Path[], FileType)
572   @SuppressWarnings("deprecation")
573   private static void parseDistributedCacheArtifacts(
574       Configuration conf,
575       Map<String, LocalResource> localResources,
576       LocalResourceType type,
577       URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[])
578   throws IOException {
579 
580     if (uris != null) {
581       // Sanity check
582       if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
583           (uris.length != visibilities.length)) {
584         throw new IllegalArgumentException("Invalid specification for " +
585             "distributed-cache artifacts of type " + type + " :" +
586             " #uris=" + uris.length +
587             " #timestamps=" + timestamps.length +
588             " #visibilities=" + visibilities.length
589             );
590       }
591 
592       for (int i = 0; i < uris.length; ++i) {
593         URI u = uris[i];
594         Path p = new Path(u);
595         FileSystem remoteFS = p.getFileSystem(conf);
596         p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
597             remoteFS.getWorkingDirectory()));
598         // Add URI fragment or just the filename
599         Path name = new Path((null == u.getFragment())
600           ? p.getName()
601           : u.getFragment());
602         if (name.isAbsolute()) {
603           throw new IllegalArgumentException("Resource name must be relative");
604         }
605         String linkName = name.toUri().getPath();
606         LocalResource orig = localResources.get(linkName);
607         org.apache.hadoop.yarn.api.records.URL url =
608           ConverterUtils.getYarnUrlFromURI(p.toUri());
609         if(orig != null && !orig.getResource().equals(url)) {
610           LOG.warn(
611               getResourceDescription(orig.getType()) +
612               toString(orig.getResource()) + " conflicts with " +
613               getResourceDescription(type) + toString(url) +
614               " This will be an error in Hadoop 2.0");
615           continue;
616         }
617         localResources.put(linkName, LocalResource.newInstance(ConverterUtils
618           .getYarnUrlFromURI(p.toUri()), type, visibilities[i]
619             ? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE,
620           sizes[i], timestamps[i]));
621       }
622     }
623   }
624 
625   // TODO - Move this to MR!
626   private static long[] getFileSizes(Configuration conf, String key) {
627     String[] strs = conf.getStrings(key);
628     if (strs == null) {
629       return null;
630     }
631     long[] result = new long[strs.length];
632     for(int i=0; i < strs.length; ++i) {
633       result[i] = Long.parseLong(strs[i]);
634     }
635     return result;
636   }
637 
638   public static String getChildLogLevel(Configuration conf, boolean isMap) {
639     if (isMap) {
640       return conf.get(
641           MRJobConfig.MAP_LOG_LEVEL,
642           JobConf.DEFAULT_LOG_LEVEL.toString()
643       );
644     } else {
645       return conf.get(
646           MRJobConfig.REDUCE_LOG_LEVEL,
647           JobConf.DEFAULT_LOG_LEVEL.toString()
648       );
649     }
650   }
651 
652   /**
653    * Add the JVM system properties necessary to configure
654    *  {@link ContainerLogAppender} or
655    *  {@link ContainerRollingLogAppender}.
656    *
657    * @param task for map/reduce, or null for app master
658    * @param vargs the argument list to append to
659    * @param conf configuration of MR job
660    */
661   public static void addLog4jSystemProperties(Task task,
662       List<String> vargs, Configuration conf) {
663     String log4jPropertyFile =
664         conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, "");
665     if (log4jPropertyFile.isEmpty()) {
666       vargs.add("-Dlog4j.configuration=container-log4j.properties");
667     } else {
668       URI log4jURI = null;
669       try {
670         log4jURI = new URI(log4jPropertyFile);
671       } catch (URISyntaxException e) {
672         throw new IllegalArgumentException(e);
673       }
674       Path log4jPath = new Path(log4jURI);
675       vargs.add("-Dlog4j.configuration="+log4jPath.getName());
676     }
677 
678     long logSize;
679     String logLevel;
680     int numBackups;
681 
682     if (task == null) {
683       logSize = conf.getLong(MRJobConfig.MR_AM_LOG_KB,
684           MRJobConfig.DEFAULT_MR_AM_LOG_KB) << 10;
685       logLevel = conf.get(
686           MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
687       numBackups = conf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS,
688           MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS);
689     } else {
690       logSize = TaskLog.getTaskLogLimitBytes(conf);
691       logLevel = getChildLogLevel(conf, task.isMapTask());
692       numBackups = conf.getInt(MRJobConfig.TASK_LOG_BACKUPS,
693           MRJobConfig.DEFAULT_TASK_LOG_BACKUPS);
694     }
695 
696     vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" +
697         ApplicationConstants.LOG_DIR_EXPANSION_VAR);
698     vargs.add(
699         "-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + logSize);
700 
701     if (logSize > 0L && numBackups > 0) {
702       // log should be rolled
703       vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_BACKUPS + "="
704           + numBackups);
705       vargs.add("-Dhadoop.root.logger=" + logLevel + ",CRLA");
706     } else {
707       vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
708     }
709     vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG);
710 
711     if (   task != null
712         && !task.isMapTask()
713         && conf.getBoolean(MRJobConfig.REDUCE_SEPARATE_SHUFFLE_LOG,
714                MRJobConfig.DEFAULT_REDUCE_SEPARATE_SHUFFLE_LOG)) {
715       final int numShuffleBackups = conf.getInt(MRJobConfig.SHUFFLE_LOG_BACKUPS,
716           MRJobConfig.DEFAULT_SHUFFLE_LOG_BACKUPS);
717       final long shuffleLogSize = conf.getLong(MRJobConfig.SHUFFLE_LOG_KB,
718           MRJobConfig.DEFAULT_SHUFFLE_LOG_KB) << 10;
719       final String shuffleLogger = logLevel
720           + (shuffleLogSize > 0L && numShuffleBackups > 0
721                  ? ",shuffleCRLA"
722                  : ",shuffleCLA");
723 
724       vargs.add("-D" + MRJobConfig.MR_PREFIX
725           + "shuffle.logger=" + shuffleLogger);
726       vargs.add("-D" + MRJobConfig.MR_PREFIX
727           + "shuffle.logfile=" + TaskLog.LogName.SYSLOG + ".shuffle");
728       vargs.add("-D" + MRJobConfig.MR_PREFIX
729           + "shuffle.log.filesize=" + shuffleLogSize);
730       vargs.add("-D" + MRJobConfig.MR_PREFIX
731           + "shuffle.log.backups=" + numShuffleBackups);
732     }
733   }
734 
735   public static void setEnvFromInputString(Map<String, String> env,
736       String envString, Configuration conf) {
737     String classPathSeparator =
738         conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
739           MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
740             ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
741     Apps.setEnvFromInputString(env, envString, classPathSeparator);
742   }
743 
744   @Public
745   @Unstable
746   public static void addToEnvironment(Map<String, String> environment,
747       String variable, String value, Configuration conf) {
748     String classPathSeparator =
749         conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
750           MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
751             ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
752     Apps.addToEnvironment(environment, variable, value, classPathSeparator);
753   }
754 
755   public static String crossPlatformifyMREnv(Configuration conf, Environment env) {
756     boolean crossPlatform =
757         conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
758             MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
759     return crossPlatform ? env.$$() : env.$();
760   }
761 }
762