1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 
22 import java.io.IOException;
23 import java.util.regex.Pattern;
24 
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.classification.InterfaceAudience;
28 import org.apache.hadoop.classification.InterfaceAudience.Private;
29 import org.apache.hadoop.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileStatus;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.io.LongWritable;
35 import org.apache.hadoop.io.RawComparator;
36 import org.apache.hadoop.io.Text;
37 import org.apache.hadoop.io.WritableComparable;
38 import org.apache.hadoop.io.WritableComparator;
39 import org.apache.hadoop.io.compress.CompressionCodec;
40 import org.apache.hadoop.mapred.lib.HashPartitioner;
41 import org.apache.hadoop.mapred.lib.IdentityMapper;
42 import org.apache.hadoop.mapred.lib.IdentityReducer;
43 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
44 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
45 import org.apache.hadoop.mapreduce.MRConfig;
46 import org.apache.hadoop.mapreduce.MRJobConfig;
47 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
48 import org.apache.hadoop.mapreduce.util.ConfigUtil;
49 import org.apache.hadoop.security.Credentials;
50 import org.apache.hadoop.util.ClassUtil;
51 import org.apache.hadoop.util.ReflectionUtils;
52 import org.apache.hadoop.util.Tool;
53 import org.apache.log4j.Level;
54 
55 /**
56  * A map/reduce job configuration.
57  *
58  * <p><code>JobConf</code> is the primary interface for a user to describe a
59  * map-reduce job to the Hadoop framework for execution. The framework tries to
60  * faithfully execute the job as-is described by <code>JobConf</code>, however:
61  * <ol>
62  *   <li>
63  *   Some configuration parameters might have been marked as
64  *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
65  *   final</a> by administrators and hence cannot be altered.
66  *   </li>
67  *   <li>
68  *   While some job parameters are straight-forward to set
69  *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
70  *   with the rest of the framework and/or job-configuration and is relatively
71  *   more complex for the user to control finely
72  *   (e.g. {@link #setNumMapTasks(int)}).
73  *   </li>
74  * </ol>
75  *
76  * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner
77  * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
78  * {@link OutputFormat} implementations to be used etc.
79  *
80  * <p>Optionally <code>JobConf</code> is used to specify other advanced facets
81  * of the job such as <code>Comparator</code>s to be used, files to be put in
82  * the {@link DistributedCache}, whether or not intermediate and/or job outputs
83  * are to be compressed (and how), debugability via user-provided scripts
84  * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
85  * for doing post-processing on task logs, task's stdout, stderr, syslog.
86  * and etc.</p>
87  *
88  * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
89  * <p><blockquote><pre>
90  *     // Create a new JobConf
91  *     JobConf job = new JobConf(new Configuration(), MyJob.class);
92  *
93  *     // Specify various job-specific parameters
94  *     job.setJobName("myjob");
95  *
96  *     FileInputFormat.setInputPaths(job, new Path("in"));
97  *     FileOutputFormat.setOutputPath(job, new Path("out"));
98  *
99  *     job.setMapperClass(MyJob.MyMapper.class);
100  *     job.setCombinerClass(MyJob.MyReducer.class);
101  *     job.setReducerClass(MyJob.MyReducer.class);
102  *
103  *     job.setInputFormat(SequenceFileInputFormat.class);
104  *     job.setOutputFormat(SequenceFileOutputFormat.class);
105  * </pre></blockquote>
106  *
107  * @see JobClient
108  * @see ClusterStatus
109  * @see Tool
110  * @see DistributedCache
111  */
112 @InterfaceAudience.Public
113 @InterfaceStability.Stable
114 public class JobConf extends Configuration {
115 
116   private static final Log LOG = LogFactory.getLog(JobConf.class);
117 
118   static{
ConfigUtil.loadResources()119     ConfigUtil.loadResources();
120   }
121 
122   /**
123    * @deprecated Use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY} and
124    * {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
125    */
126   @Deprecated
127   public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
128     "mapred.task.maxvmem";
129 
130   /**
131    * @deprecated
132    */
133   @Deprecated
134   public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
135     "mapred.task.limit.maxvmem";
136 
137   /**
138    * @deprecated
139    */
140   @Deprecated
141   public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
142     "mapred.task.default.maxvmem";
143 
144   /**
145    * @deprecated
146    */
147   @Deprecated
148   public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
149     "mapred.task.maxpmem";
150 
151   /**
152    * A value which if set for memory related configuration options,
153    * indicates that the options are turned off.
154    * Deprecated because it makes no sense in the context of MR2.
155    */
156   @Deprecated
157   public static final long DISABLED_MEMORY_LIMIT = -1L;
158 
159   /**
160    * Property name for the configuration property mapreduce.cluster.local.dir
161    */
162   public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
163 
164   /**
165    * Name of the queue to which jobs will be submitted, if no queue
166    * name is mentioned.
167    */
168   public static final String DEFAULT_QUEUE_NAME = "default";
169 
170   static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY =
171       JobContext.MAP_MEMORY_MB;
172 
173   static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY =
174     JobContext.REDUCE_MEMORY_MB;
175 
176   /**
177    * The variable is kept for M/R 1.x applications, while M/R 2.x applications
178    * should use {@link #MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY}
179    */
180   @Deprecated
181   public static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
182       "mapred.job.map.memory.mb";
183 
184   /**
185    * The variable is kept for M/R 1.x applications, while M/R 2.x applications
186    * should use {@link #MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY}
187    */
188   @Deprecated
189   public static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
190       "mapred.job.reduce.memory.mb";
191 
192   /** Pattern for the default unpacking behavior for job jars */
193   public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
194     Pattern.compile("(?:classes/|lib/).*");
195 
196   /**
197    * Configuration key to set the java command line options for the child
198    * map and reduce tasks.
199    *
200    * Java opts for the task tracker child processes.
201    * The following symbol, if present, will be interpolated: @taskid@.
202    * It is replaced by current TaskID. Any other occurrences of '@' will go
203    * unchanged.
204    * For example, to enable verbose gc logging to a file named for the taskid in
205    * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
206    *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
207    *
208    * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass
209    * other environment variables to the child processes.
210    *
211    * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or
212    *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
213    */
214   @Deprecated
215   public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
216 
217   /**
218    * Configuration key to set the java command line options for the map tasks.
219    *
220    * Java opts for the task tracker child map processes.
221    * The following symbol, if present, will be interpolated: @taskid@.
222    * It is replaced by current TaskID. Any other occurrences of '@' will go
223    * unchanged.
224    * For example, to enable verbose gc logging to a file named for the taskid in
225    * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
226    *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
227    *
228    * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass
229    * other environment variables to the map processes.
230    */
231   public static final String MAPRED_MAP_TASK_JAVA_OPTS =
232     JobContext.MAP_JAVA_OPTS;
233 
234   /**
235    * Configuration key to set the java command line options for the reduce tasks.
236    *
237    * Java opts for the task tracker child reduce processes.
238    * The following symbol, if present, will be interpolated: @taskid@.
239    * It is replaced by current TaskID. Any other occurrences of '@' will go
240    * unchanged.
241    * For example, to enable verbose gc logging to a file named for the taskid in
242    * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
243    *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
244    *
245    * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to
246    * pass process environment variables to the reduce processes.
247    */
248   public static final String MAPRED_REDUCE_TASK_JAVA_OPTS =
249     JobContext.REDUCE_JAVA_OPTS;
250 
251   public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
252 
253   /**
254    * @deprecated
255    * Configuration key to set the maximum virtual memory available to the child
256    * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
257    * longer have any effect.
258    */
259   @Deprecated
260   public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
261 
262   /**
263    * @deprecated
264    * Configuration key to set the maximum virtual memory available to the
265    * map tasks (in kilo-bytes). This has been deprecated and will no
266    * longer have any effect.
267    */
268   @Deprecated
269   public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
270 
271   /**
272    * @deprecated
273    * Configuration key to set the maximum virtual memory available to the
274    * reduce tasks (in kilo-bytes). This has been deprecated and will no
275    * longer have any effect.
276    */
277   @Deprecated
278   public static final String MAPRED_REDUCE_TASK_ULIMIT =
279     "mapreduce.reduce.ulimit";
280 
281 
282   /**
283    * Configuration key to set the environment of the child map/reduce tasks.
284    *
285    * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
286    * reference existing environment variables via <code>$key</code> on
287    * Linux or <code>%key%</code> on Windows.
288    *
289    * Example:
290    * <ul>
291    *   <li> A=foo - This will set the env variable A to foo. </li>
292    *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
293    *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
294    * </ul>
295    *
296    * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or
297    *                 {@link #MAPRED_REDUCE_TASK_ENV}
298    */
299   @Deprecated
300   public static final String MAPRED_TASK_ENV = "mapred.child.env";
301 
302   /**
303    * Configuration key to set the environment of the child map tasks.
304    *
305    * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
306    * reference existing environment variables via <code>$key</code> on
307    * Linux or <code>%key%</code> on Windows.
308    *
309    * Example:
310    * <ul>
311    *   <li> A=foo - This will set the env variable A to foo. </li>
312    *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
313    *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
314    * </ul>
315    */
316   public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
317 
318   /**
319    * Configuration key to set the environment of the child reduce tasks.
320    *
321    * The format of the value is <code>k1=v1,k2=v2</code>. Further it can
322    * reference existing environment variables via <code>$key</code> on
323    * Linux or <code>%key%</code> on Windows.
324    *
325    * Example:
326    * <ul>
327    *   <li> A=foo - This will set the env variable A to foo. </li>
328    *   <li> B=$X:c This is inherit tasktracker's X env variable on Linux. </li>
329    *   <li> B=%X%;c This is inherit tasktracker's X env variable on Windows. </li>
330    * </ul>
331    */
332   public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
333 
334   private Credentials credentials = new Credentials();
335 
336   /**
337    * Configuration key to set the logging {@link Level} for the map task.
338    *
339    * The allowed logging levels are:
340    * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
341    */
342   public static final String MAPRED_MAP_TASK_LOG_LEVEL =
343     JobContext.MAP_LOG_LEVEL;
344 
345   /**
346    * Configuration key to set the logging {@link Level} for the reduce task.
347    *
348    * The allowed logging levels are:
349    * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
350    */
351   public static final String MAPRED_REDUCE_TASK_LOG_LEVEL =
352     JobContext.REDUCE_LOG_LEVEL;
353 
354   /**
355    * Default logging level for map/reduce tasks.
356    */
357   public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
358 
359   /**
360    * The variable is kept for M/R 1.x applications, M/R 2.x applications should
361    * use {@link MRJobConfig#WORKFLOW_ID} instead
362    */
363   @Deprecated
364   public static final String WORKFLOW_ID = MRJobConfig.WORKFLOW_ID;
365 
366   /**
367    * The variable is kept for M/R 1.x applications, M/R 2.x applications should
368    * use {@link MRJobConfig#WORKFLOW_NAME} instead
369    */
370   @Deprecated
371   public static final String WORKFLOW_NAME = MRJobConfig.WORKFLOW_NAME;
372 
373   /**
374    * The variable is kept for M/R 1.x applications, M/R 2.x applications should
375    * use {@link MRJobConfig#WORKFLOW_NODE_NAME} instead
376    */
377   @Deprecated
378   public static final String WORKFLOW_NODE_NAME =
379       MRJobConfig.WORKFLOW_NODE_NAME;
380 
381   /**
382    * The variable is kept for M/R 1.x applications, M/R 2.x applications should
383    * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_STRING} instead
384    */
385   @Deprecated
386   public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
387       MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING;
388 
389   /**
390    * The variable is kept for M/R 1.x applications, M/R 2.x applications should
391    * use {@link MRJobConfig#WORKFLOW_ADJACENCY_PREFIX_PATTERN} instead
392    */
393   @Deprecated
394   public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
395       MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN;
396 
397   /**
398    * The variable is kept for M/R 1.x applications, M/R 2.x applications should
399    * use {@link MRJobConfig#WORKFLOW_TAGS} instead
400    */
401   @Deprecated
402   public static final String WORKFLOW_TAGS = MRJobConfig.WORKFLOW_TAGS;
403 
404   /**
405    * The variable is kept for M/R 1.x applications, M/R 2.x applications should
406    * not use it
407    */
408   @Deprecated
409   public static final String MAPREDUCE_RECOVER_JOB =
410       "mapreduce.job.restart.recover";
411 
412   /**
413    * The variable is kept for M/R 1.x applications, M/R 2.x applications should
414    * not use it
415    */
416   @Deprecated
417   public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
418 
419   /**
420    * Construct a map/reduce job configuration.
421    */
JobConf()422   public JobConf() {
423     checkAndWarnDeprecation();
424   }
425 
426   /**
427    * Construct a map/reduce job configuration.
428    *
429    * @param exampleClass a class whose containing jar is used as the job's jar.
430    */
JobConf(Class exampleClass)431   public JobConf(Class exampleClass) {
432     setJarByClass(exampleClass);
433     checkAndWarnDeprecation();
434   }
435 
436   /**
437    * Construct a map/reduce job configuration.
438    *
439    * @param conf a Configuration whose settings will be inherited.
440    */
JobConf(Configuration conf)441   public JobConf(Configuration conf) {
442     super(conf);
443 
444     if (conf instanceof JobConf) {
445       JobConf that = (JobConf)conf;
446       credentials = that.credentials;
447     }
448 
449     checkAndWarnDeprecation();
450   }
451 
452 
453   /** Construct a map/reduce job configuration.
454    *
455    * @param conf a Configuration whose settings will be inherited.
456    * @param exampleClass a class whose containing jar is used as the job's jar.
457    */
JobConf(Configuration conf, Class exampleClass)458   public JobConf(Configuration conf, Class exampleClass) {
459     this(conf);
460     setJarByClass(exampleClass);
461   }
462 
463 
464   /** Construct a map/reduce configuration.
465    *
466    * @param config a Configuration-format XML job description file.
467    */
JobConf(String config)468   public JobConf(String config) {
469     this(new Path(config));
470   }
471 
472   /** Construct a map/reduce configuration.
473    *
474    * @param config a Configuration-format XML job description file.
475    */
JobConf(Path config)476   public JobConf(Path config) {
477     super();
478     addResource(config);
479     checkAndWarnDeprecation();
480   }
481 
482   /** A new map/reduce configuration where the behavior of reading from the
483    * default resources can be turned off.
484    * <p>
485    * If the parameter {@code loadDefaults} is false, the new instance
486    * will not load resources from the default files.
487    *
488    * @param loadDefaults specifies whether to load from the default files
489    */
JobConf(boolean loadDefaults)490   public JobConf(boolean loadDefaults) {
491     super(loadDefaults);
492     checkAndWarnDeprecation();
493   }
494 
495   /**
496    * Get credentials for the job.
497    * @return credentials for the job
498    */
getCredentials()499   public Credentials getCredentials() {
500     return credentials;
501   }
502 
503   @Private
setCredentials(Credentials credentials)504   public void setCredentials(Credentials credentials) {
505     this.credentials = credentials;
506   }
507 
508   /**
509    * Get the user jar for the map-reduce job.
510    *
511    * @return the user jar for the map-reduce job.
512    */
getJar()513   public String getJar() { return get(JobContext.JAR); }
514 
515   /**
516    * Set the user jar for the map-reduce job.
517    *
518    * @param jar the user jar for the map-reduce job.
519    */
setJar(String jar)520   public void setJar(String jar) { set(JobContext.JAR, jar); }
521 
522   /**
523    * Get the pattern for jar contents to unpack on the tasktracker
524    */
getJarUnpackPattern()525   public Pattern getJarUnpackPattern() {
526     return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
527   }
528 
529 
530   /**
531    * Set the job's jar file by finding an example class location.
532    *
533    * @param cls the example class.
534    */
setJarByClass(Class cls)535   public void setJarByClass(Class cls) {
536     String jar = ClassUtil.findContainingJar(cls);
537     if (jar != null) {
538       setJar(jar);
539     }
540   }
541 
getLocalDirs()542   public String[] getLocalDirs() throws IOException {
543     return getTrimmedStrings(MRConfig.LOCAL_DIR);
544   }
545 
546   /**
547    * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
548    */
549   @Deprecated
deleteLocalFiles()550   public void deleteLocalFiles() throws IOException {
551     String[] localDirs = getLocalDirs();
552     for (int i = 0; i < localDirs.length; i++) {
553       FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
554     }
555   }
556 
deleteLocalFiles(String subdir)557   public void deleteLocalFiles(String subdir) throws IOException {
558     String[] localDirs = getLocalDirs();
559     for (int i = 0; i < localDirs.length; i++) {
560       FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
561     }
562   }
563 
564   /**
565    * Constructs a local file name. Files are distributed among configured
566    * local directories.
567    */
getLocalPath(String pathString)568   public Path getLocalPath(String pathString) throws IOException {
569     return getLocalPath(MRConfig.LOCAL_DIR, pathString);
570   }
571 
572   /**
573    * Get the reported username for this job.
574    *
575    * @return the username
576    */
getUser()577   public String getUser() {
578     return get(JobContext.USER_NAME);
579   }
580 
581   /**
582    * Set the reported username for this job.
583    *
584    * @param user the username for this job.
585    */
setUser(String user)586   public void setUser(String user) {
587     set(JobContext.USER_NAME, user);
588   }
589 
590 
591 
592   /**
593    * Set whether the framework should keep the intermediate files for
594    * failed tasks.
595    *
596    * @param keep <code>true</code> if framework should keep the intermediate files
597    *             for failed tasks, <code>false</code> otherwise.
598    *
599    */
setKeepFailedTaskFiles(boolean keep)600   public void setKeepFailedTaskFiles(boolean keep) {
601     setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
602   }
603 
604   /**
605    * Should the temporary files for failed tasks be kept?
606    *
607    * @return should the files be kept?
608    */
getKeepFailedTaskFiles()609   public boolean getKeepFailedTaskFiles() {
610     return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
611   }
612 
613   /**
614    * Set a regular expression for task names that should be kept.
615    * The regular expression ".*_m_000123_0" would keep the files
616    * for the first instance of map 123 that ran.
617    *
618    * @param pattern the java.util.regex.Pattern to match against the
619    *        task names.
620    */
setKeepTaskFilesPattern(String pattern)621   public void setKeepTaskFilesPattern(String pattern) {
622     set(JobContext.PRESERVE_FILES_PATTERN, pattern);
623   }
624 
625   /**
626    * Get the regular expression that is matched against the task names
627    * to see if we need to keep the files.
628    *
629    * @return the pattern as a string, if it was set, othewise null.
630    */
getKeepTaskFilesPattern()631   public String getKeepTaskFilesPattern() {
632     return get(JobContext.PRESERVE_FILES_PATTERN);
633   }
634 
635   /**
636    * Set the current working directory for the default file system.
637    *
638    * @param dir the new current working directory.
639    */
setWorkingDirectory(Path dir)640   public void setWorkingDirectory(Path dir) {
641     dir = new Path(getWorkingDirectory(), dir);
642     set(JobContext.WORKING_DIR, dir.toString());
643   }
644 
645   /**
646    * Get the current working directory for the default file system.
647    *
648    * @return the directory name.
649    */
getWorkingDirectory()650   public Path getWorkingDirectory() {
651     String name = get(JobContext.WORKING_DIR);
652     if (name != null) {
653       return new Path(name);
654     } else {
655       try {
656         Path dir = FileSystem.get(this).getWorkingDirectory();
657         set(JobContext.WORKING_DIR, dir.toString());
658         return dir;
659       } catch (IOException e) {
660         throw new RuntimeException(e);
661       }
662     }
663   }
664 
665   /**
666    * Sets the number of tasks that a spawned task JVM should run
667    * before it exits
668    * @param numTasks the number of tasks to execute; defaults to 1;
669    * -1 signifies no limit
670    */
setNumTasksToExecutePerJvm(int numTasks)671   public void setNumTasksToExecutePerJvm(int numTasks) {
672     setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
673   }
674 
675   /**
676    * Get the number of tasks that a spawned JVM should execute
677    */
getNumTasksToExecutePerJvm()678   public int getNumTasksToExecutePerJvm() {
679     return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
680   }
681 
682   /**
683    * Get the {@link InputFormat} implementation for the map-reduce job,
684    * defaults to {@link TextInputFormat} if not specified explicity.
685    *
686    * @return the {@link InputFormat} implementation for the map-reduce job.
687    */
getInputFormat()688   public InputFormat getInputFormat() {
689     return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
690                                                              TextInputFormat.class,
691                                                              InputFormat.class),
692                                                     this);
693   }
694 
695   /**
696    * Set the {@link InputFormat} implementation for the map-reduce job.
697    *
698    * @param theClass the {@link InputFormat} implementation for the map-reduce
699    *                 job.
700    */
setInputFormat(Class<? extends InputFormat> theClass)701   public void setInputFormat(Class<? extends InputFormat> theClass) {
702     setClass("mapred.input.format.class", theClass, InputFormat.class);
703   }
704 
705   /**
706    * Get the {@link OutputFormat} implementation for the map-reduce job,
707    * defaults to {@link TextOutputFormat} if not specified explicity.
708    *
709    * @return the {@link OutputFormat} implementation for the map-reduce job.
710    */
getOutputFormat()711   public OutputFormat getOutputFormat() {
712     return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
713                                                               TextOutputFormat.class,
714                                                               OutputFormat.class),
715                                                      this);
716   }
717 
718   /**
719    * Get the {@link OutputCommitter} implementation for the map-reduce job,
720    * defaults to {@link FileOutputCommitter} if not specified explicitly.
721    *
722    * @return the {@link OutputCommitter} implementation for the map-reduce job.
723    */
getOutputCommitter()724   public OutputCommitter getOutputCommitter() {
725     return (OutputCommitter)ReflectionUtils.newInstance(
726       getClass("mapred.output.committer.class", FileOutputCommitter.class,
727                OutputCommitter.class), this);
728   }
729 
730   /**
731    * Set the {@link OutputCommitter} implementation for the map-reduce job.
732    *
733    * @param theClass the {@link OutputCommitter} implementation for the map-reduce
734    *                 job.
735    */
setOutputCommitter(Class<? extends OutputCommitter> theClass)736   public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
737     setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
738   }
739 
740   /**
741    * Set the {@link OutputFormat} implementation for the map-reduce job.
742    *
743    * @param theClass the {@link OutputFormat} implementation for the map-reduce
744    *                 job.
745    */
setOutputFormat(Class<? extends OutputFormat> theClass)746   public void setOutputFormat(Class<? extends OutputFormat> theClass) {
747     setClass("mapred.output.format.class", theClass, OutputFormat.class);
748   }
749 
750   /**
751    * Should the map outputs be compressed before transfer?
752    *
753    * @param compress should the map outputs be compressed?
754    */
setCompressMapOutput(boolean compress)755   public void setCompressMapOutput(boolean compress) {
756     setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
757   }
758 
759   /**
760    * Are the outputs of the maps be compressed?
761    *
762    * @return <code>true</code> if the outputs of the maps are to be compressed,
763    *         <code>false</code> otherwise.
764    */
getCompressMapOutput()765   public boolean getCompressMapOutput() {
766     return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
767   }
768 
769   /**
770    * Set the given class as the  {@link CompressionCodec} for the map outputs.
771    *
772    * @param codecClass the {@link CompressionCodec} class that will compress
773    *                   the map outputs.
774    */
775   public void
setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass)776   setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
777     setCompressMapOutput(true);
778     setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass,
779              CompressionCodec.class);
780   }
781 
782   /**
783    * Get the {@link CompressionCodec} for compressing the map outputs.
784    *
785    * @param defaultValue the {@link CompressionCodec} to return if not set
786    * @return the {@link CompressionCodec} class that should be used to compress the
787    *         map outputs.
788    * @throws IllegalArgumentException if the class was specified, but not found
789    */
790   public Class<? extends CompressionCodec>
getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue)791   getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
792     Class<? extends CompressionCodec> codecClass = defaultValue;
793     String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
794     if (name != null) {
795       try {
796         codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
797       } catch (ClassNotFoundException e) {
798         throw new IllegalArgumentException("Compression codec " + name +
799                                            " was not found.", e);
800       }
801     }
802     return codecClass;
803   }
804 
805   /**
806    * Get the key class for the map output data. If it is not set, use the
807    * (final) output key class. This allows the map output key class to be
808    * different than the final output key class.
809    *
810    * @return the map output key class.
811    */
getMapOutputKeyClass()812   public Class<?> getMapOutputKeyClass() {
813     Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
814     if (retv == null) {
815       retv = getOutputKeyClass();
816     }
817     return retv;
818   }
819 
820   /**
821    * Set the key class for the map output data. This allows the user to
822    * specify the map output key class to be different than the final output
823    * value class.
824    *
825    * @param theClass the map output key class.
826    */
setMapOutputKeyClass(Class<?> theClass)827   public void setMapOutputKeyClass(Class<?> theClass) {
828     setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
829   }
830 
831   /**
832    * Get the value class for the map output data. If it is not set, use the
833    * (final) output value class This allows the map output value class to be
834    * different than the final output value class.
835    *
836    * @return the map output value class.
837    */
getMapOutputValueClass()838   public Class<?> getMapOutputValueClass() {
839     Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
840         Object.class);
841     if (retv == null) {
842       retv = getOutputValueClass();
843     }
844     return retv;
845   }
846 
847   /**
848    * Set the value class for the map output data. This allows the user to
849    * specify the map output value class to be different than the final output
850    * value class.
851    *
852    * @param theClass the map output value class.
853    */
setMapOutputValueClass(Class<?> theClass)854   public void setMapOutputValueClass(Class<?> theClass) {
855     setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
856   }
857 
858   /**
859    * Get the key class for the job output data.
860    *
861    * @return the key class for the job output data.
862    */
getOutputKeyClass()863   public Class<?> getOutputKeyClass() {
864     return getClass(JobContext.OUTPUT_KEY_CLASS,
865                     LongWritable.class, Object.class);
866   }
867 
868   /**
869    * Set the key class for the job output data.
870    *
871    * @param theClass the key class for the job output data.
872    */
setOutputKeyClass(Class<?> theClass)873   public void setOutputKeyClass(Class<?> theClass) {
874     setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
875   }
876 
877   /**
878    * Get the {@link RawComparator} comparator used to compare keys.
879    *
880    * @return the {@link RawComparator} comparator used to compare keys.
881    */
getOutputKeyComparator()882   public RawComparator getOutputKeyComparator() {
883     Class<? extends RawComparator> theClass = getClass(
884       JobContext.KEY_COMPARATOR, null, RawComparator.class);
885     if (theClass != null)
886       return ReflectionUtils.newInstance(theClass, this);
887     return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
888   }
889 
890   /**
891    * Set the {@link RawComparator} comparator used to compare keys.
892    *
893    * @param theClass the {@link RawComparator} comparator used to
894    *                 compare keys.
895    * @see #setOutputValueGroupingComparator(Class)
896    */
setOutputKeyComparatorClass(Class<? extends RawComparator> theClass)897   public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
898     setClass(JobContext.KEY_COMPARATOR,
899              theClass, RawComparator.class);
900   }
901 
902   /**
903    * Set the {@link KeyFieldBasedComparator} options used to compare keys.
904    *
905    * @param keySpec the key specification of the form -k pos1[,pos2], where,
906    *  pos is of the form f[.c][opts], where f is the number
907    *  of the key field to use, and c is the number of the first character from
908    *  the beginning of the field. Fields and character posns are numbered
909    *  starting with 1; a character position of zero in pos2 indicates the
910    *  field's last character. If '.c' is omitted from pos1, it defaults to 1
911    *  (the beginning of the field); if omitted from pos2, it defaults to 0
912    *  (the end of the field). opts are ordering options. The supported options
913    *  are:
914    *    -n, (Sort numerically)
915    *    -r, (Reverse the result of comparison)
916    */
setKeyFieldComparatorOptions(String keySpec)917   public void setKeyFieldComparatorOptions(String keySpec) {
918     setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
919     set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
920   }
921 
922   /**
923    * Get the {@link KeyFieldBasedComparator} options
924    */
getKeyFieldComparatorOption()925   public String getKeyFieldComparatorOption() {
926     return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
927   }
928 
929   /**
930    * Set the {@link KeyFieldBasedPartitioner} options used for
931    * {@link Partitioner}
932    *
933    * @param keySpec the key specification of the form -k pos1[,pos2], where,
934    *  pos is of the form f[.c][opts], where f is the number
935    *  of the key field to use, and c is the number of the first character from
936    *  the beginning of the field. Fields and character posns are numbered
937    *  starting with 1; a character position of zero in pos2 indicates the
938    *  field's last character. If '.c' is omitted from pos1, it defaults to 1
939    *  (the beginning of the field); if omitted from pos2, it defaults to 0
940    *  (the end of the field).
941    */
setKeyFieldPartitionerOptions(String keySpec)942   public void setKeyFieldPartitionerOptions(String keySpec) {
943     setPartitionerClass(KeyFieldBasedPartitioner.class);
944     set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
945   }
946 
947   /**
948    * Get the {@link KeyFieldBasedPartitioner} options
949    */
getKeyFieldPartitionerOption()950   public String getKeyFieldPartitionerOption() {
951     return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
952   }
953 
954   /**
955    * Get the user defined {@link WritableComparable} comparator for
956    * grouping keys of inputs to the combiner.
957    *
958    * @return comparator set by the user for grouping values.
959    * @see #setCombinerKeyGroupingComparator(Class) for details.
960    */
getCombinerKeyGroupingComparator()961   public RawComparator getCombinerKeyGroupingComparator() {
962     Class<? extends RawComparator> theClass = getClass(
963         JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
964     if (theClass == null) {
965       return getOutputKeyComparator();
966     }
967 
968     return ReflectionUtils.newInstance(theClass, this);
969   }
970 
971   /**
972    * Get the user defined {@link WritableComparable} comparator for
973    * grouping keys of inputs to the reduce.
974    *
975    * @return comparator set by the user for grouping values.
976    * @see #setOutputValueGroupingComparator(Class) for details.
977    */
getOutputValueGroupingComparator()978   public RawComparator getOutputValueGroupingComparator() {
979     Class<? extends RawComparator> theClass = getClass(
980       JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
981     if (theClass == null) {
982       return getOutputKeyComparator();
983     }
984 
985     return ReflectionUtils.newInstance(theClass, this);
986   }
987 
988   /**
989    * Set the user defined {@link RawComparator} comparator for
990    * grouping keys in the input to the combiner.
991    *
992    * <p>This comparator should be provided if the equivalence rules for keys
993    * for sorting the intermediates are different from those for grouping keys
994    * before each call to
995    * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
996    *
997    * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
998    * in a single call to the reduce function if K1 and K2 compare as equal.</p>
999    *
1000    * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1001    * how keys are sorted, this can be used in conjunction to simulate
1002    * <i>secondary sort on values</i>.</p>
1003    *
1004    * <p><i>Note</i>: This is not a guarantee of the combiner sort being
1005    * <i>stable</i> in any sense. (In any case, with the order of available
1006    * map-outputs to the combiner being non-deterministic, it wouldn't make
1007    * that much sense.)</p>
1008    *
1009    * @param theClass the comparator class to be used for grouping keys for the
1010    * combiner. It should implement <code>RawComparator</code>.
1011    * @see #setOutputKeyComparatorClass(Class)
1012    */
setCombinerKeyGroupingComparator( Class<? extends RawComparator> theClass)1013   public void setCombinerKeyGroupingComparator(
1014       Class<? extends RawComparator> theClass) {
1015     setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
1016         theClass, RawComparator.class);
1017   }
1018 
1019   /**
1020    * Set the user defined {@link RawComparator} comparator for
1021    * grouping keys in the input to the reduce.
1022    *
1023    * <p>This comparator should be provided if the equivalence rules for keys
1024    * for sorting the intermediates are different from those for grouping keys
1025    * before each call to
1026    * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
1027    *
1028    * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
1029    * in a single call to the reduce function if K1 and K2 compare as equal.</p>
1030    *
1031    * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
1032    * how keys are sorted, this can be used in conjunction to simulate
1033    * <i>secondary sort on values</i>.</p>
1034    *
1035    * <p><i>Note</i>: This is not a guarantee of the reduce sort being
1036    * <i>stable</i> in any sense. (In any case, with the order of available
1037    * map-outputs to the reduce being non-deterministic, it wouldn't make
1038    * that much sense.)</p>
1039    *
1040    * @param theClass the comparator class to be used for grouping keys.
1041    *                 It should implement <code>RawComparator</code>.
1042    * @see #setOutputKeyComparatorClass(Class)
1043    * @see #setCombinerKeyGroupingComparator(Class)
1044    */
setOutputValueGroupingComparator( Class<? extends RawComparator> theClass)1045   public void setOutputValueGroupingComparator(
1046       Class<? extends RawComparator> theClass) {
1047     setClass(JobContext.GROUP_COMPARATOR_CLASS,
1048              theClass, RawComparator.class);
1049   }
1050 
1051   /**
1052    * Should the framework use the new context-object code for running
1053    * the mapper?
1054    * @return true, if the new api should be used
1055    */
getUseNewMapper()1056   public boolean getUseNewMapper() {
1057     return getBoolean("mapred.mapper.new-api", false);
1058   }
1059   /**
1060    * Set whether the framework should use the new api for the mapper.
1061    * This is the default for jobs submitted with the new Job api.
1062    * @param flag true, if the new api should be used
1063    */
setUseNewMapper(boolean flag)1064   public void setUseNewMapper(boolean flag) {
1065     setBoolean("mapred.mapper.new-api", flag);
1066   }
1067 
1068   /**
1069    * Should the framework use the new context-object code for running
1070    * the reducer?
1071    * @return true, if the new api should be used
1072    */
getUseNewReducer()1073   public boolean getUseNewReducer() {
1074     return getBoolean("mapred.reducer.new-api", false);
1075   }
1076   /**
1077    * Set whether the framework should use the new api for the reducer.
1078    * This is the default for jobs submitted with the new Job api.
1079    * @param flag true, if the new api should be used
1080    */
setUseNewReducer(boolean flag)1081   public void setUseNewReducer(boolean flag) {
1082     setBoolean("mapred.reducer.new-api", flag);
1083   }
1084 
1085   /**
1086    * Get the value class for job outputs.
1087    *
1088    * @return the value class for job outputs.
1089    */
getOutputValueClass()1090   public Class<?> getOutputValueClass() {
1091     return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
1092   }
1093 
1094   /**
1095    * Set the value class for job outputs.
1096    *
1097    * @param theClass the value class for job outputs.
1098    */
setOutputValueClass(Class<?> theClass)1099   public void setOutputValueClass(Class<?> theClass) {
1100     setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
1101   }
1102 
1103   /**
1104    * Get the {@link Mapper} class for the job.
1105    *
1106    * @return the {@link Mapper} class for the job.
1107    */
getMapperClass()1108   public Class<? extends Mapper> getMapperClass() {
1109     return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
1110   }
1111 
1112   /**
1113    * Set the {@link Mapper} class for the job.
1114    *
1115    * @param theClass the {@link Mapper} class for the job.
1116    */
setMapperClass(Class<? extends Mapper> theClass)1117   public void setMapperClass(Class<? extends Mapper> theClass) {
1118     setClass("mapred.mapper.class", theClass, Mapper.class);
1119   }
1120 
1121   /**
1122    * Get the {@link MapRunnable} class for the job.
1123    *
1124    * @return the {@link MapRunnable} class for the job.
1125    */
getMapRunnerClass()1126   public Class<? extends MapRunnable> getMapRunnerClass() {
1127     return getClass("mapred.map.runner.class",
1128                     MapRunner.class, MapRunnable.class);
1129   }
1130 
1131   /**
1132    * Expert: Set the {@link MapRunnable} class for the job.
1133    *
1134    * Typically used to exert greater control on {@link Mapper}s.
1135    *
1136    * @param theClass the {@link MapRunnable} class for the job.
1137    */
setMapRunnerClass(Class<? extends MapRunnable> theClass)1138   public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1139     setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1140   }
1141 
1142   /**
1143    * Get the {@link Partitioner} used to partition {@link Mapper}-outputs
1144    * to be sent to the {@link Reducer}s.
1145    *
1146    * @return the {@link Partitioner} used to partition map-outputs.
1147    */
getPartitionerClass()1148   public Class<? extends Partitioner> getPartitionerClass() {
1149     return getClass("mapred.partitioner.class",
1150                     HashPartitioner.class, Partitioner.class);
1151   }
1152 
1153   /**
1154    * Set the {@link Partitioner} class used to partition
1155    * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1156    *
1157    * @param theClass the {@link Partitioner} used to partition map-outputs.
1158    */
setPartitionerClass(Class<? extends Partitioner> theClass)1159   public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1160     setClass("mapred.partitioner.class", theClass, Partitioner.class);
1161   }
1162 
1163   /**
1164    * Get the {@link Reducer} class for the job.
1165    *
1166    * @return the {@link Reducer} class for the job.
1167    */
getReducerClass()1168   public Class<? extends Reducer> getReducerClass() {
1169     return getClass("mapred.reducer.class",
1170                     IdentityReducer.class, Reducer.class);
1171   }
1172 
1173   /**
1174    * Set the {@link Reducer} class for the job.
1175    *
1176    * @param theClass the {@link Reducer} class for the job.
1177    */
setReducerClass(Class<? extends Reducer> theClass)1178   public void setReducerClass(Class<? extends Reducer> theClass) {
1179     setClass("mapred.reducer.class", theClass, Reducer.class);
1180   }
1181 
1182   /**
1183    * Get the user-defined <i>combiner</i> class used to combine map-outputs
1184    * before being sent to the reducers. Typically the combiner is same as the
1185    * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1186    *
1187    * @return the user-defined combiner class used to combine map-outputs.
1188    */
getCombinerClass()1189   public Class<? extends Reducer> getCombinerClass() {
1190     return getClass("mapred.combiner.class", null, Reducer.class);
1191   }
1192 
1193   /**
1194    * Set the user-defined <i>combiner</i> class used to combine map-outputs
1195    * before being sent to the reducers.
1196    *
1197    * <p>The combiner is an application-specified aggregation operation, which
1198    * can help cut down the amount of data transferred between the
1199    * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1200    *
1201    * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1202    * the mapper and reducer tasks. In general, the combiner is called as the
1203    * sort/merge result is written to disk. The combiner must:
1204    * <ul>
1205    *   <li> be side-effect free</li>
1206    *   <li> have the same input and output key types and the same input and
1207    *        output value types</li>
1208    * </ul>
1209    *
1210    * <p>Typically the combiner is same as the <code>Reducer</code> for the
1211    * job i.e. {@link #setReducerClass(Class)}.</p>
1212    *
1213    * @param theClass the user-defined combiner class used to combine
1214    *                 map-outputs.
1215    */
setCombinerClass(Class<? extends Reducer> theClass)1216   public void setCombinerClass(Class<? extends Reducer> theClass) {
1217     setClass("mapred.combiner.class", theClass, Reducer.class);
1218   }
1219 
1220   /**
1221    * Should speculative execution be used for this job?
1222    * Defaults to <code>true</code>.
1223    *
1224    * @return <code>true</code> if speculative execution be used for this job,
1225    *         <code>false</code> otherwise.
1226    */
getSpeculativeExecution()1227   public boolean getSpeculativeExecution() {
1228     return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1229   }
1230 
1231   /**
1232    * Turn speculative execution on or off for this job.
1233    *
1234    * @param speculativeExecution <code>true</code> if speculative execution
1235    *                             should be turned on, else <code>false</code>.
1236    */
setSpeculativeExecution(boolean speculativeExecution)1237   public void setSpeculativeExecution(boolean speculativeExecution) {
1238     setMapSpeculativeExecution(speculativeExecution);
1239     setReduceSpeculativeExecution(speculativeExecution);
1240   }
1241 
1242   /**
1243    * Should speculative execution be used for this job for map tasks?
1244    * Defaults to <code>true</code>.
1245    *
1246    * @return <code>true</code> if speculative execution be
1247    *                           used for this job for map tasks,
1248    *         <code>false</code> otherwise.
1249    */
getMapSpeculativeExecution()1250   public boolean getMapSpeculativeExecution() {
1251     return getBoolean(JobContext.MAP_SPECULATIVE, true);
1252   }
1253 
1254   /**
1255    * Turn speculative execution on or off for this job for map tasks.
1256    *
1257    * @param speculativeExecution <code>true</code> if speculative execution
1258    *                             should be turned on for map tasks,
1259    *                             else <code>false</code>.
1260    */
setMapSpeculativeExecution(boolean speculativeExecution)1261   public void setMapSpeculativeExecution(boolean speculativeExecution) {
1262     setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1263   }
1264 
1265   /**
1266    * Should speculative execution be used for this job for reduce tasks?
1267    * Defaults to <code>true</code>.
1268    *
1269    * @return <code>true</code> if speculative execution be used
1270    *                           for reduce tasks for this job,
1271    *         <code>false</code> otherwise.
1272    */
getReduceSpeculativeExecution()1273   public boolean getReduceSpeculativeExecution() {
1274     return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1275   }
1276 
1277   /**
1278    * Turn speculative execution on or off for this job for reduce tasks.
1279    *
1280    * @param speculativeExecution <code>true</code> if speculative execution
1281    *                             should be turned on for reduce tasks,
1282    *                             else <code>false</code>.
1283    */
setReduceSpeculativeExecution(boolean speculativeExecution)1284   public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1285     setBoolean(JobContext.REDUCE_SPECULATIVE,
1286                speculativeExecution);
1287   }
1288 
1289   /**
1290    * Get configured the number of reduce tasks for this job.
1291    * Defaults to <code>1</code>.
1292    *
1293    * @return the number of reduce tasks for this job.
1294    */
getNumMapTasks()1295   public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1296 
1297   /**
1298    * Set the number of map tasks for this job.
1299    *
1300    * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual
1301    * number of spawned map tasks depends on the number of {@link InputSplit}s
1302    * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1303    *
1304    * A custom {@link InputFormat} is typically used to accurately control
1305    * the number of map tasks for the job.</p>
1306    *
1307    * <b id="NoOfMaps">How many maps?</b>
1308    *
1309    * <p>The number of maps is usually driven by the total size of the inputs
1310    * i.e. total number of blocks of the input files.</p>
1311    *
1312    * <p>The right level of parallelism for maps seems to be around 10-100 maps
1313    * per-node, although it has been set up to 300 or so for very cpu-light map
1314    * tasks. Task setup takes awhile, so it is best if the maps take at least a
1315    * minute to execute.</p>
1316    *
1317    * <p>The default behavior of file-based {@link InputFormat}s is to split the
1318    * input into <i>logical</i> {@link InputSplit}s based on the total size, in
1319    * bytes, of input files. However, the {@link FileSystem} blocksize of the
1320    * input files is treated as an upper bound for input splits. A lower bound
1321    * on the split size can be set via
1322    * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1323    * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1324    *
1325    * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB,
1326    * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is
1327    * used to set it even higher.</p>
1328    *
1329    * @param n the number of map tasks for this job.
1330    * @see InputFormat#getSplits(JobConf, int)
1331    * @see FileInputFormat
1332    * @see FileSystem#getDefaultBlockSize()
1333    * @see FileStatus#getBlockSize()
1334    */
setNumMapTasks(int n)1335   public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1336 
1337   /**
1338    * Get configured the number of reduce tasks for this job. Defaults to
1339    * <code>1</code>.
1340    *
1341    * @return the number of reduce tasks for this job.
1342    */
getNumReduceTasks()1343   public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1344 
1345   /**
1346    * Set the requisite number of reduce tasks for this job.
1347    *
1348    * <b id="NoOfReduces">How many reduces?</b>
1349    *
1350    * <p>The right number of reduces seems to be <code>0.95</code> or
1351    * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; *
1352    * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1353    * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1354    * </p>
1355    *
1356    * <p>With <code>0.95</code> all of the reduces can launch immediately and
1357    * start transfering map outputs as the maps finish. With <code>1.75</code>
1358    * the faster nodes will finish their first round of reduces and launch a
1359    * second wave of reduces doing a much better job of load balancing.</p>
1360    *
1361    * <p>Increasing the number of reduces increases the framework overhead, but
1362    * increases load balancing and lowers the cost of failures.</p>
1363    *
1364    * <p>The scaling factors above are slightly less than whole numbers to
1365    * reserve a few reduce slots in the framework for speculative-tasks, failures
1366    * etc.</p>
1367    *
1368    * <b id="ReducerNone">Reducer NONE</b>
1369    *
1370    * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1371    *
1372    * <p>In this case the output of the map-tasks directly go to distributed
1373    * file-system, to the path set by
1374    * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the
1375    * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1376    *
1377    * @param n the number of reduce tasks for this job.
1378    */
setNumReduceTasks(int n)1379   public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1380 
1381   /**
1382    * Get the configured number of maximum attempts that will be made to run a
1383    * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1384    * property. If this property is not already set, the default is 4 attempts.
1385    *
1386    * @return the max number of attempts per map task.
1387    */
getMaxMapAttempts()1388   public int getMaxMapAttempts() {
1389     return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1390   }
1391 
1392   /**
1393    * Expert: Set the number of maximum attempts that will be made to run a
1394    * map task.
1395    *
1396    * @param n the number of attempts per map task.
1397    */
setMaxMapAttempts(int n)1398   public void setMaxMapAttempts(int n) {
1399     setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1400   }
1401 
1402   /**
1403    * Get the configured number of maximum attempts  that will be made to run a
1404    * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1405    * property. If this property is not already set, the default is 4 attempts.
1406    *
1407    * @return the max number of attempts per reduce task.
1408    */
getMaxReduceAttempts()1409   public int getMaxReduceAttempts() {
1410     return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1411   }
1412   /**
1413    * Expert: Set the number of maximum attempts that will be made to run a
1414    * reduce task.
1415    *
1416    * @param n the number of attempts per reduce task.
1417    */
setMaxReduceAttempts(int n)1418   public void setMaxReduceAttempts(int n) {
1419     setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1420   }
1421 
1422   /**
1423    * Get the user-specified job name. This is only used to identify the
1424    * job to the user.
1425    *
1426    * @return the job's name, defaulting to "".
1427    */
getJobName()1428   public String getJobName() {
1429     return get(JobContext.JOB_NAME, "");
1430   }
1431 
1432   /**
1433    * Set the user-specified job name.
1434    *
1435    * @param name the job's new name.
1436    */
setJobName(String name)1437   public void setJobName(String name) {
1438     set(JobContext.JOB_NAME, name);
1439   }
1440 
1441   /**
1442    * Get the user-specified session identifier. The default is the empty string.
1443    *
1444    * The session identifier is used to tag metric data that is reported to some
1445    * performance metrics system via the org.apache.hadoop.metrics API.  The
1446    * session identifier is intended, in particular, for use by Hadoop-On-Demand
1447    * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently.
1448    * HOD will set the session identifier by modifying the mapred-site.xml file
1449    * before starting the cluster.
1450    *
1451    * When not running under HOD, this identifer is expected to remain set to
1452    * the empty string.
1453    *
1454    * @return the session identifier, defaulting to "".
1455    */
1456   @Deprecated
getSessionId()1457   public String getSessionId() {
1458       return get("session.id", "");
1459   }
1460 
1461   /**
1462    * Set the user-specified session identifier.
1463    *
1464    * @param sessionId the new session id.
1465    */
1466   @Deprecated
setSessionId(String sessionId)1467   public void setSessionId(String sessionId) {
1468       set("session.id", sessionId);
1469   }
1470 
1471   /**
1472    * Set the maximum no. of failures of a given job per tasktracker.
1473    * If the no. of task failures exceeds <code>noFailures</code>, the
1474    * tasktracker is <i>blacklisted</i> for this job.
1475    *
1476    * @param noFailures maximum no. of failures of a given job per tasktracker.
1477    */
setMaxTaskFailuresPerTracker(int noFailures)1478   public void setMaxTaskFailuresPerTracker(int noFailures) {
1479     setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1480   }
1481 
1482   /**
1483    * Expert: Get the maximum no. of failures of a given job per tasktracker.
1484    * If the no. of task failures exceeds this, the tasktracker is
1485    * <i>blacklisted</i> for this job.
1486    *
1487    * @return the maximum no. of failures of a given job per tasktracker.
1488    */
getMaxTaskFailuresPerTracker()1489   public int getMaxTaskFailuresPerTracker() {
1490     return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
1491   }
1492 
1493   /**
1494    * Get the maximum percentage of map tasks that can fail without
1495    * the job being aborted.
1496    *
1497    * Each map task is executed a minimum of {@link #getMaxMapAttempts()}
1498    * attempts before being declared as <i>failed</i>.
1499    *
1500    * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1501    * the job being declared as {@link JobStatus#FAILED}.
1502    *
1503    * @return the maximum percentage of map tasks that can fail without
1504    *         the job being aborted.
1505    */
getMaxMapTaskFailuresPercent()1506   public int getMaxMapTaskFailuresPercent() {
1507     return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1508   }
1509 
1510   /**
1511    * Expert: Set the maximum percentage of map tasks that can fail without the
1512    * job being aborted.
1513    *
1514    * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts
1515    * before being declared as <i>failed</i>.
1516    *
1517    * @param percent the maximum percentage of map tasks that can fail without
1518    *                the job being aborted.
1519    */
setMaxMapTaskFailuresPercent(int percent)1520   public void setMaxMapTaskFailuresPercent(int percent) {
1521     setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1522   }
1523 
1524   /**
1525    * Get the maximum percentage of reduce tasks that can fail without
1526    * the job being aborted.
1527    *
1528    * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1529    * attempts before being declared as <i>failed</i>.
1530    *
1531    * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results
1532    * in the job being declared as {@link JobStatus#FAILED}.
1533    *
1534    * @return the maximum percentage of reduce tasks that can fail without
1535    *         the job being aborted.
1536    */
getMaxReduceTaskFailuresPercent()1537   public int getMaxReduceTaskFailuresPercent() {
1538     return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1539   }
1540 
1541   /**
1542    * Set the maximum percentage of reduce tasks that can fail without the job
1543    * being aborted.
1544    *
1545    * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
1546    * attempts before being declared as <i>failed</i>.
1547    *
1548    * @param percent the maximum percentage of reduce tasks that can fail without
1549    *                the job being aborted.
1550    */
setMaxReduceTaskFailuresPercent(int percent)1551   public void setMaxReduceTaskFailuresPercent(int percent) {
1552     setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1553   }
1554 
1555   /**
1556    * Set {@link JobPriority} for this job.
1557    *
1558    * @param prio the {@link JobPriority} for this job.
1559    */
setJobPriority(JobPriority prio)1560   public void setJobPriority(JobPriority prio) {
1561     set(JobContext.PRIORITY, prio.toString());
1562   }
1563 
1564   /**
1565    * Get the {@link JobPriority} for this job.
1566    *
1567    * @return the {@link JobPriority} for this job.
1568    */
getJobPriority()1569   public JobPriority getJobPriority() {
1570     String prio = get(JobContext.PRIORITY);
1571     if(prio == null) {
1572       return JobPriority.NORMAL;
1573     }
1574 
1575     return JobPriority.valueOf(prio);
1576   }
1577 
1578   /**
1579    * Set JobSubmitHostName for this job.
1580    *
1581    * @param hostname the JobSubmitHostName for this job.
1582    */
setJobSubmitHostName(String hostname)1583   void setJobSubmitHostName(String hostname) {
1584     set(MRJobConfig.JOB_SUBMITHOST, hostname);
1585   }
1586 
1587   /**
1588    * Get the  JobSubmitHostName for this job.
1589    *
1590    * @return the JobSubmitHostName for this job.
1591    */
getJobSubmitHostName()1592   String getJobSubmitHostName() {
1593     String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1594 
1595     return hostname;
1596   }
1597 
1598   /**
1599    * Set JobSubmitHostAddress for this job.
1600    *
1601    * @param hostadd the JobSubmitHostAddress for this job.
1602    */
setJobSubmitHostAddress(String hostadd)1603   void setJobSubmitHostAddress(String hostadd) {
1604     set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1605   }
1606 
1607   /**
1608    * Get JobSubmitHostAddress for this job.
1609    *
1610    * @return  JobSubmitHostAddress for this job.
1611    */
getJobSubmitHostAddress()1612   String getJobSubmitHostAddress() {
1613     String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1614 
1615     return hostadd;
1616   }
1617 
1618   /**
1619    * Get whether the task profiling is enabled.
1620    * @return true if some tasks will be profiled
1621    */
getProfileEnabled()1622   public boolean getProfileEnabled() {
1623     return getBoolean(JobContext.TASK_PROFILE, false);
1624   }
1625 
1626   /**
1627    * Set whether the system should collect profiler information for some of
1628    * the tasks in this job? The information is stored in the user log
1629    * directory.
1630    * @param newValue true means it should be gathered
1631    */
setProfileEnabled(boolean newValue)1632   public void setProfileEnabled(boolean newValue) {
1633     setBoolean(JobContext.TASK_PROFILE, newValue);
1634   }
1635 
1636   /**
1637    * Get the profiler configuration arguments.
1638    *
1639    * The default value for this property is
1640    * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1641    *
1642    * @return the parameters to pass to the task child to configure profiling
1643    */
getProfileParams()1644   public String getProfileParams() {
1645     return get(JobContext.TASK_PROFILE_PARAMS,
1646         MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
1647   }
1648 
1649   /**
1650    * Set the profiler configuration arguments. If the string contains a '%s' it
1651    * will be replaced with the name of the profiling output file when the task
1652    * runs.
1653    *
1654    * This value is passed to the task child JVM on the command line.
1655    *
1656    * @param value the configuration string
1657    */
setProfileParams(String value)1658   public void setProfileParams(String value) {
1659     set(JobContext.TASK_PROFILE_PARAMS, value);
1660   }
1661 
1662   /**
1663    * Get the range of maps or reduces to profile.
1664    * @param isMap is the task a map?
1665    * @return the task ranges
1666    */
getProfileTaskRange(boolean isMap)1667   public IntegerRanges getProfileTaskRange(boolean isMap) {
1668     return getRange((isMap ? JobContext.NUM_MAP_PROFILES :
1669                        JobContext.NUM_REDUCE_PROFILES), "0-2");
1670   }
1671 
1672   /**
1673    * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
1674    * must also be called.
1675    * @param newValue a set of integer ranges of the map ids
1676    */
setProfileTaskRange(boolean isMap, String newValue)1677   public void setProfileTaskRange(boolean isMap, String newValue) {
1678     // parse the value to make sure it is legal
1679       new Configuration.IntegerRanges(newValue);
1680     set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES),
1681           newValue);
1682   }
1683 
1684   /**
1685    * Set the debug script to run when the map tasks fail.
1686    *
1687    * <p>The debug script can aid debugging of failed map tasks. The script is
1688    * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1689    *
1690    * <p>The debug command, run on the node where the map failed, is:</p>
1691    * <p><blockquote><pre>
1692    * $script $stdout $stderr $syslog $jobconf.
1693    * </pre></blockquote>
1694    *
1695    * <p> The script file is distributed through {@link DistributedCache}
1696    * APIs. The script needs to be symlinked. </p>
1697    *
1698    * <p>Here is an example on how to submit a script
1699    * <p><blockquote><pre>
1700    * job.setMapDebugScript("./myscript");
1701    * DistributedCache.createSymlink(job);
1702    * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1703    * </pre></blockquote>
1704    *
1705    * @param mDbgScript the script name
1706    */
setMapDebugScript(String mDbgScript)1707   public void  setMapDebugScript(String mDbgScript) {
1708     set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1709   }
1710 
1711   /**
1712    * Get the map task's debug script.
1713    *
1714    * @return the debug Script for the mapred job for failed map tasks.
1715    * @see #setMapDebugScript(String)
1716    */
getMapDebugScript()1717   public String getMapDebugScript() {
1718     return get(JobContext.MAP_DEBUG_SCRIPT);
1719   }
1720 
1721   /**
1722    * Set the debug script to run when the reduce tasks fail.
1723    *
1724    * <p>The debug script can aid debugging of failed reduce tasks. The script
1725    * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1726    *
1727    * <p>The debug command, run on the node where the map failed, is:</p>
1728    * <p><blockquote><pre>
1729    * $script $stdout $stderr $syslog $jobconf.
1730    * </pre></blockquote>
1731    *
1732    * <p> The script file is distributed through {@link DistributedCache}
1733    * APIs. The script file needs to be symlinked </p>
1734    *
1735    * <p>Here is an example on how to submit a script
1736    * <p><blockquote><pre>
1737    * job.setReduceDebugScript("./myscript");
1738    * DistributedCache.createSymlink(job);
1739    * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1740    * </pre></blockquote>
1741    *
1742    * @param rDbgScript the script name
1743    */
setReduceDebugScript(String rDbgScript)1744   public void  setReduceDebugScript(String rDbgScript) {
1745     set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1746   }
1747 
1748   /**
1749    * Get the reduce task's debug Script
1750    *
1751    * @return the debug script for the mapred job for failed reduce tasks.
1752    * @see #setReduceDebugScript(String)
1753    */
getReduceDebugScript()1754   public String getReduceDebugScript() {
1755     return get(JobContext.REDUCE_DEBUG_SCRIPT);
1756   }
1757 
1758   /**
1759    * Get the uri to be invoked in-order to send a notification after the job
1760    * has completed (success/failure).
1761    *
1762    * @return the job end notification uri, <code>null</code> if it hasn't
1763    *         been set.
1764    * @see #setJobEndNotificationURI(String)
1765    */
getJobEndNotificationURI()1766   public String getJobEndNotificationURI() {
1767     return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1768   }
1769 
1770   /**
1771    * Set the uri to be invoked in-order to send a notification after the job
1772    * has completed (success/failure).
1773    *
1774    * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and
1775    * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's
1776    * identifier and completion-status respectively.</p>
1777    *
1778    * <p>This is typically used by application-writers to implement chaining of
1779    * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1780    *
1781    * @param uri the job end notification uri
1782    * @see JobStatus
1783    */
setJobEndNotificationURI(String uri)1784   public void setJobEndNotificationURI(String uri) {
1785     set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1786   }
1787 
1788   /**
1789    * Get job-specific shared directory for use as scratch space
1790    *
1791    * <p>
1792    * When a job starts, a shared directory is created at location
1793    * <code>
1794    * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1795    * This directory is exposed to the users through
1796    * <code>mapreduce.job.local.dir </code>.
1797    * So, the tasks can use this space
1798    * as scratch space and share files among them. </p>
1799    * This value is available as System property also.
1800    *
1801    * @return The localized job specific shared directory
1802    */
getJobLocalDir()1803   public String getJobLocalDir() {
1804     return get(JobContext.JOB_LOCAL_DIR);
1805   }
1806 
1807   /**
1808    * Get memory required to run a map task of the job, in MB.
1809    *
1810    * If a value is specified in the configuration, it is returned.
1811    * Else, it returns {@link JobContext#DEFAULT_MAP_MEMORY_MB}.
1812    * <p>
1813    * For backward compatibility, if the job configuration sets the
1814    * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1815    * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1816    * after converting it from bytes to MB.
1817    * @return memory required to run a map task of the job, in MB,
1818    */
getMemoryForMapTask()1819   public long getMemoryForMapTask() {
1820     long value = getDeprecatedMemoryValue();
1821     if (value < 0) {
1822       return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1823           JobContext.DEFAULT_MAP_MEMORY_MB);
1824     }
1825     return value;
1826   }
1827 
setMemoryForMapTask(long mem)1828   public void setMemoryForMapTask(long mem) {
1829     setLong(JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1830     // In case that M/R 1.x applications use the old property name
1831     setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1832   }
1833 
1834   /**
1835    * Get memory required to run a reduce task of the job, in MB.
1836    *
1837    * If a value is specified in the configuration, it is returned.
1838    * Else, it returns {@link JobContext#DEFAULT_REDUCE_MEMORY_MB}.
1839    * <p>
1840    * For backward compatibility, if the job configuration sets the
1841    * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1842    * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1843    * after converting it from bytes to MB.
1844    * @return memory required to run a reduce task of the job, in MB.
1845    */
getMemoryForReduceTask()1846   public long getMemoryForReduceTask() {
1847     long value = getDeprecatedMemoryValue();
1848     if (value < 0) {
1849       return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1850           JobContext.DEFAULT_REDUCE_MEMORY_MB);
1851     }
1852     return value;
1853   }
1854 
1855   // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1856   // converted into MBs.
1857   // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1858   // value.
getDeprecatedMemoryValue()1859   private long getDeprecatedMemoryValue() {
1860     long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1861         DISABLED_MEMORY_LIMIT);
1862     if (oldValue > 0) {
1863       oldValue /= (1024*1024);
1864     }
1865     return oldValue;
1866   }
1867 
setMemoryForReduceTask(long mem)1868   public void setMemoryForReduceTask(long mem) {
1869     setLong(JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1870     // In case that M/R 1.x applications use the old property name
1871     setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1872   }
1873 
1874   /**
1875    * Return the name of the queue to which this job is submitted.
1876    * Defaults to 'default'.
1877    *
1878    * @return name of the queue
1879    */
getQueueName()1880   public String getQueueName() {
1881     return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1882   }
1883 
1884   /**
1885    * Set the name of the queue to which this job should be submitted.
1886    *
1887    * @param queueName Name of the queue
1888    */
setQueueName(String queueName)1889   public void setQueueName(String queueName) {
1890     set(JobContext.QUEUE_NAME, queueName);
1891   }
1892 
1893   /**
1894    * Normalize the negative values in configuration
1895    *
1896    * @param val
1897    * @return normalized value
1898    */
normalizeMemoryConfigValue(long val)1899   public static long normalizeMemoryConfigValue(long val) {
1900     if (val < 0) {
1901       val = DISABLED_MEMORY_LIMIT;
1902     }
1903     return val;
1904   }
1905 
1906   /**
1907    * Find a jar that contains a class of the same name, if any.
1908    * It will return a jar file, even if that is not the first thing
1909    * on the class path that has a class with the same name.
1910    *
1911    * @param my_class the class to find.
1912    * @return a jar file that contains the class, or null.
1913    */
findContainingJar(Class my_class)1914   public static String findContainingJar(Class my_class) {
1915     return ClassUtil.findContainingJar(my_class);
1916   }
1917 
1918   /**
1919    * Get the memory required to run a task of this job, in bytes. See
1920    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1921    * <p>
1922    * This method is deprecated. Now, different memory limits can be
1923    * set for map and reduce tasks of a job, in MB.
1924    * <p>
1925    * For backward compatibility, if the job configuration sets the
1926    * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned.
1927    * Otherwise, this method will return the larger of the values returned by
1928    * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1929    * after converting them into bytes.
1930    *
1931    * @return Memory required to run a task of this job, in bytes.
1932    * @see #setMaxVirtualMemoryForTask(long)
1933    * @deprecated Use {@link #getMemoryForMapTask()} and
1934    *             {@link #getMemoryForReduceTask()}
1935    */
1936   @Deprecated
getMaxVirtualMemoryForTask()1937   public long getMaxVirtualMemoryForTask() {
1938     LOG.warn(
1939       "getMaxVirtualMemoryForTask() is deprecated. " +
1940       "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1941 
1942     long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
1943         Math.max(getMemoryForMapTask(), getMemoryForReduceTask()) * 1024 * 1024);
1944     return value;
1945   }
1946 
1947   /**
1948    * Set the maximum amount of memory any task of this job can use. See
1949    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1950    * <p>
1951    * mapred.task.maxvmem is split into
1952    * mapreduce.map.memory.mb
1953    * and mapreduce.map.memory.mb,mapred
1954    * each of the new key are set
1955    * as mapred.task.maxvmem / 1024
1956    * as new values are in MB
1957    *
1958    * @param vmem Maximum amount of virtual memory in bytes any task of this job
1959    *             can use.
1960    * @see #getMaxVirtualMemoryForTask()
1961    * @deprecated
1962    *  Use {@link #setMemoryForMapTask(long mem)}  and
1963    *  Use {@link #setMemoryForReduceTask(long mem)}
1964    */
1965   @Deprecated
setMaxVirtualMemoryForTask(long vmem)1966   public void setMaxVirtualMemoryForTask(long vmem) {
1967     LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
1968       "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
1969     if (vmem < 0) {
1970       throw new IllegalArgumentException("Task memory allocation may not be < 0");
1971     }
1972 
1973     if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
1974       setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
1975       setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
1976     }else{
1977       this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
1978     }
1979   }
1980 
1981   /**
1982    * @deprecated this variable is deprecated and nolonger in use.
1983    */
1984   @Deprecated
getMaxPhysicalMemoryForTask()1985   public long getMaxPhysicalMemoryForTask() {
1986     LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
1987               + " Refer to the APIs getMemoryForMapTask() and"
1988               + " getMemoryForReduceTask() for details.");
1989     return -1;
1990   }
1991 
1992   /*
1993    * @deprecated this
1994    */
1995   @Deprecated
setMaxPhysicalMemoryForTask(long mem)1996   public void setMaxPhysicalMemoryForTask(long mem) {
1997     LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
1998         + " The value set is ignored. Refer to "
1999         + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
2000   }
2001 
deprecatedString(String key)2002   static String deprecatedString(String key) {
2003     return "The variable " + key + " is no longer used.";
2004   }
2005 
checkAndWarnDeprecation()2006   private void checkAndWarnDeprecation() {
2007     if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
2008       LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
2009                 + " Instead use " + JobConf.MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY
2010                 + " and " + JobConf.MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY);
2011     }
2012     if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
2013       LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
2014     }
2015     if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
2016       LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
2017     }
2018     if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
2019       LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
2020     }
2021   }
2022 
2023 
2024 }
2025 
2026