1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce;
20 
21 import java.io.IOException;
22 import java.security.PrivilegedExceptionAction;
23 
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.io.RawComparator;
27 import org.apache.hadoop.mapreduce.TaskAttemptID;
28 import org.apache.hadoop.mapred.JobClient;
29 import org.apache.hadoop.mapred.JobConf;
30 import org.apache.hadoop.mapred.RunningJob;
31 import org.apache.hadoop.mapred.TaskCompletionEvent;
32 
33 /**
34  * The job submitter's view of the Job. It allows the user to configure the
35  * job, submit it, control its execution, and query the state. The set methods
36  * only work until the job is submitted, afterwards they will throw an
37  * IllegalStateException.
38  */
39 public class Job extends JobContext {
40   public static enum JobState {DEFINE, RUNNING};
41   private JobState state = JobState.DEFINE;
42   private JobClient jobClient;
43   private RunningJob info;
44 
45   /**
46    * Creates a new {@link Job}
47    * A Job will be created with a generic {@link Configuration}.
48    *
49    * @return the {@link Job}
50    * @throws IOException
51    */
getInstance()52   public static Job getInstance() throws IOException {
53     // create with a null Cluster
54     return getInstance(new Configuration());
55   }
56 
57   /**
58    * Creates a new {@link Job} with a given {@link Configuration}.
59    *
60    * The <code>Job</code> makes a copy of the <code>Configuration</code> so
61    * that any necessary internal modifications do not reflect on the incoming
62    * parameter.
63    *
64    * @param conf the {@link Configuration}
65    * @return the {@link Job}
66    * @throws IOException
67    */
getInstance(Configuration conf)68   public static Job getInstance(Configuration conf) throws IOException {
69     // create with a null Cluster
70     JobConf jobConf = new JobConf(conf);
71     return new Job(jobConf);
72   }
73 
74   /**
75    * Creates a new {@link Job} with a given {@link Configuration}
76    * and a given jobName.
77    *
78    * The <code>Job</code> makes a copy of the <code>Configuration</code> so
79    * that any necessary internal modifications do not reflect on the incoming
80    * parameter.
81    *
82    * @param conf the {@link Configuration}
83    * @param jobName the job instance's name
84    * @return the {@link Job}
85    * @throws IOException
86    */
getInstance(Configuration conf, String jobName)87   public static Job getInstance(Configuration conf, String jobName)
88            throws IOException {
89     // create with a null Cluster
90     Job result = getInstance(conf);
91     result.setJobName(jobName);
92     return result;
93   }
94 
Job()95   public Job() throws IOException {
96     this(new Configuration());
97   }
98 
Job(Configuration conf)99   public Job(Configuration conf) throws IOException {
100     super(conf, null);
101   }
102 
Job(Configuration conf, String jobName)103   public Job(Configuration conf, String jobName) throws IOException {
104     this(conf);
105     setJobName(jobName);
106   }
107 
getJobClient()108   JobClient getJobClient() {
109     return jobClient;
110   }
111 
ensureState(JobState state)112   private void ensureState(JobState state) throws IllegalStateException {
113     if (state != this.state) {
114       throw new IllegalStateException("Job in state "+ this.state +
115                                       " instead of " + state);
116     }
117 
118     if (state == JobState.RUNNING && jobClient == null) {
119       throw new IllegalStateException("Job in state " + JobState.RUNNING +
120                                       " however jobClient is not initialized!");
121     }
122   }
123 
124   /**
125    * Set the number of reduce tasks for the job.
126    * @param tasks the number of reduce tasks
127    * @throws IllegalStateException if the job is submitted
128    */
setNumReduceTasks(int tasks)129   public void setNumReduceTasks(int tasks) throws IllegalStateException {
130     ensureState(JobState.DEFINE);
131     conf.setNumReduceTasks(tasks);
132   }
133 
134   /**
135    * Set the current working directory for the default file system.
136    *
137    * @param dir the new current working directory.
138    * @throws IllegalStateException if the job is submitted
139    */
setWorkingDirectory(Path dir)140   public void setWorkingDirectory(Path dir) throws IOException {
141     ensureState(JobState.DEFINE);
142     conf.setWorkingDirectory(dir);
143   }
144 
145   /**
146    * Set the {@link InputFormat} for the job.
147    * @param cls the <code>InputFormat</code> to use
148    * @throws IllegalStateException if the job is submitted
149    */
setInputFormatClass(Class<? extends InputFormat> cls )150   public void setInputFormatClass(Class<? extends InputFormat> cls
151                                   ) throws IllegalStateException {
152     ensureState(JobState.DEFINE);
153     conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
154   }
155 
156   /**
157    * Set the {@link OutputFormat} for the job.
158    * @param cls the <code>OutputFormat</code> to use
159    * @throws IllegalStateException if the job is submitted
160    */
setOutputFormatClass(Class<? extends OutputFormat> cls )161   public void setOutputFormatClass(Class<? extends OutputFormat> cls
162                                    ) throws IllegalStateException {
163     ensureState(JobState.DEFINE);
164     conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
165   }
166 
167   /**
168    * Set the {@link Mapper} for the job.
169    * @param cls the <code>Mapper</code> to use
170    * @throws IllegalStateException if the job is submitted
171    */
setMapperClass(Class<? extends Mapper> cls )172   public void setMapperClass(Class<? extends Mapper> cls
173                              ) throws IllegalStateException {
174     ensureState(JobState.DEFINE);
175     conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
176   }
177 
178   /**
179    * Set the Jar by finding where a given class came from.
180    * @param cls the example class
181    */
setJarByClass(Class<?> cls)182   public void setJarByClass(Class<?> cls) {
183     conf.setJarByClass(cls);
184   }
185 
186   /**
187    * Get the pathname of the job's jar.
188    * @return the pathname
189    */
getJar()190   public String getJar() {
191     return conf.getJar();
192   }
193 
194   /**
195    * Set the combiner class for the job.
196    * @param cls the combiner to use
197    * @throws IllegalStateException if the job is submitted
198    */
setCombinerClass(Class<? extends Reducer> cls )199   public void setCombinerClass(Class<? extends Reducer> cls
200                                ) throws IllegalStateException {
201     ensureState(JobState.DEFINE);
202     conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
203   }
204 
205   /**
206    * Set the {@link Reducer} for the job.
207    * @param cls the <code>Reducer</code> to use
208    * @throws IllegalStateException if the job is submitted
209    */
setReducerClass(Class<? extends Reducer> cls )210   public void setReducerClass(Class<? extends Reducer> cls
211                               ) throws IllegalStateException {
212     ensureState(JobState.DEFINE);
213     conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
214   }
215 
216   /**
217    * Set the {@link Partitioner} for the job.
218    * @param cls the <code>Partitioner</code> to use
219    * @throws IllegalStateException if the job is submitted
220    */
setPartitionerClass(Class<? extends Partitioner> cls )221   public void setPartitionerClass(Class<? extends Partitioner> cls
222                                   ) throws IllegalStateException {
223     ensureState(JobState.DEFINE);
224     conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
225   }
226 
227   /**
228    * Set the key class for the map output data. This allows the user to
229    * specify the map output key class to be different than the final output
230    * value class.
231    *
232    * @param theClass the map output key class.
233    * @throws IllegalStateException if the job is submitted
234    */
setMapOutputKeyClass(Class<?> theClass )235   public void setMapOutputKeyClass(Class<?> theClass
236                                    ) throws IllegalStateException {
237     ensureState(JobState.DEFINE);
238     conf.setMapOutputKeyClass(theClass);
239   }
240 
241   /**
242    * Set the value class for the map output data. This allows the user to
243    * specify the map output value class to be different than the final output
244    * value class.
245    *
246    * @param theClass the map output value class.
247    * @throws IllegalStateException if the job is submitted
248    */
setMapOutputValueClass(Class<?> theClass )249   public void setMapOutputValueClass(Class<?> theClass
250                                      ) throws IllegalStateException {
251     ensureState(JobState.DEFINE);
252     conf.setMapOutputValueClass(theClass);
253   }
254 
255   /**
256    * Set the key class for the job output data.
257    *
258    * @param theClass the key class for the job output data.
259    * @throws IllegalStateException if the job is submitted
260    */
setOutputKeyClass(Class<?> theClass )261   public void setOutputKeyClass(Class<?> theClass
262                                 ) throws IllegalStateException {
263     ensureState(JobState.DEFINE);
264     conf.setOutputKeyClass(theClass);
265   }
266 
267   /**
268    * Set the value class for job outputs.
269    *
270    * @param theClass the value class for job outputs.
271    * @throws IllegalStateException if the job is submitted
272    */
setOutputValueClass(Class<?> theClass )273   public void setOutputValueClass(Class<?> theClass
274                                   ) throws IllegalStateException {
275     ensureState(JobState.DEFINE);
276     conf.setOutputValueClass(theClass);
277   }
278 
279   /**
280    * Define the comparator that controls how the keys are sorted before they
281    * are passed to the {@link Reducer}.
282    * @param cls the raw comparator
283    * @throws IllegalStateException if the job is submitted
284    */
setSortComparatorClass(Class<? extends RawComparator> cls )285   public void setSortComparatorClass(Class<? extends RawComparator> cls
286                                      ) throws IllegalStateException {
287     ensureState(JobState.DEFINE);
288     conf.setOutputKeyComparatorClass(cls);
289   }
290 
291   /**
292    * Define the comparator that controls which keys are grouped together
293    * for a single call to
294    * {@link Reducer#reduce(Object, Iterable,
295    *                       org.apache.hadoop.mapreduce.Reducer.Context)}
296    * @param cls the raw comparator to use
297    * @throws IllegalStateException if the job is submitted
298    */
setGroupingComparatorClass(Class<? extends RawComparator> cls )299   public void setGroupingComparatorClass(Class<? extends RawComparator> cls
300                                          ) throws IllegalStateException {
301     ensureState(JobState.DEFINE);
302     conf.setOutputValueGroupingComparator(cls);
303   }
304 
305   /**
306    * Set the user-specified job name.
307    *
308    * @param name the job's new name.
309    * @throws IllegalStateException if the job is submitted
310    */
setJobName(String name)311   public void setJobName(String name) throws IllegalStateException {
312     ensureState(JobState.DEFINE);
313     conf.setJobName(name);
314   }
315 
316   /**
317    * Turn speculative execution on or off for this job.
318    *
319    * @param speculativeExecution <code>true</code> if speculative execution
320    *                             should be turned on, else <code>false</code>.
321    */
setSpeculativeExecution(boolean speculativeExecution)322   public void setSpeculativeExecution(boolean speculativeExecution) {
323     ensureState(JobState.DEFINE);
324     conf.setSpeculativeExecution(speculativeExecution);
325   }
326 
327   /**
328    * Turn speculative execution on or off for this job for map tasks.
329    *
330    * @param speculativeExecution <code>true</code> if speculative execution
331    *                             should be turned on for map tasks,
332    *                             else <code>false</code>.
333    */
setMapSpeculativeExecution(boolean speculativeExecution)334   public void setMapSpeculativeExecution(boolean speculativeExecution) {
335     ensureState(JobState.DEFINE);
336     conf.setMapSpeculativeExecution(speculativeExecution);
337   }
338 
339   /**
340    * Turn speculative execution on or off for this job for reduce tasks.
341    *
342    * @param speculativeExecution <code>true</code> if speculative execution
343    *                             should be turned on for reduce tasks,
344    *                             else <code>false</code>.
345    */
setReduceSpeculativeExecution(boolean speculativeExecution)346   public void setReduceSpeculativeExecution(boolean speculativeExecution) {
347     ensureState(JobState.DEFINE);
348     conf.setReduceSpeculativeExecution(speculativeExecution);
349   }
350 
351   /**
352    * Get the URL where some job progress information will be displayed.
353    *
354    * @return the URL where some job progress information will be displayed.
355    */
getTrackingURL()356   public String getTrackingURL() {
357     ensureState(JobState.RUNNING);
358     return info.getTrackingURL();
359   }
360 
361   /**
362    * Get the <i>progress</i> of the job's setup, as a float between 0.0
363    * and 1.0.  When the job setup is completed, the function returns 1.0.
364    *
365    * @return the progress of the job's setup.
366    * @throws IOException
367    */
setupProgress()368   public float setupProgress() throws IOException {
369     ensureState(JobState.RUNNING);
370     return info.setupProgress();
371   }
372 
373   /**
374    * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
375    * and 1.0.  When all map tasks have completed, the function returns 1.0.
376    *
377    * @return the progress of the job's map-tasks.
378    * @throws IOException
379    */
mapProgress()380   public float mapProgress() throws IOException {
381     ensureState(JobState.RUNNING);
382     return info.mapProgress();
383   }
384 
385   /**
386    * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
387    * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
388    *
389    * @return the progress of the job's reduce-tasks.
390    * @throws IOException
391    */
reduceProgress()392   public float reduceProgress() throws IOException {
393     ensureState(JobState.RUNNING);
394     return info.reduceProgress();
395   }
396 
397   /**
398    * Check if the job is finished or not.
399    * This is a non-blocking call.
400    *
401    * @return <code>true</code> if the job is complete, else <code>false</code>.
402    * @throws IOException
403    */
isComplete()404   public boolean isComplete() throws IOException {
405     ensureState(JobState.RUNNING);
406     return info.isComplete();
407   }
408 
409   /**
410    * Check if the job completed successfully.
411    *
412    * @return <code>true</code> if the job succeeded, else <code>false</code>.
413    * @throws IOException
414    */
isSuccessful()415   public boolean isSuccessful() throws IOException {
416     ensureState(JobState.RUNNING);
417     return info.isSuccessful();
418   }
419 
420   /**
421    * Kill the running job.  Blocks until all job tasks have been
422    * killed as well.  If the job is no longer running, it simply returns.
423    *
424    * @throws IOException
425    */
killJob()426   public void killJob() throws IOException {
427     ensureState(JobState.RUNNING);
428     info.killJob();
429   }
430 
431   /**
432    * Get events indicating completion (success/failure) of component tasks.
433    *
434    * @param startFrom index to start fetching events from
435    * @return an array of {@link TaskCompletionEvent}s
436    * @throws IOException
437    */
getTaskCompletionEvents(int startFrom )438   public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
439                                                        ) throws IOException {
440     ensureState(JobState.RUNNING);
441     return info.getTaskCompletionEvents(startFrom);
442   }
443 
444   /**
445    * Kill indicated task attempt.
446    *
447    * @param taskId the id of the task to be terminated.
448    * @throws IOException
449    */
killTask(TaskAttemptID taskId)450   public void killTask(TaskAttemptID taskId) throws IOException {
451     ensureState(JobState.RUNNING);
452     info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId),
453                   false);
454   }
455 
456   /**
457    * Fail indicated task attempt.
458    *
459    * @param taskId the id of the task to be terminated.
460    * @throws IOException
461    */
failTask(TaskAttemptID taskId)462   public void failTask(TaskAttemptID taskId) throws IOException {
463     ensureState(JobState.RUNNING);
464     info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId),
465                   true);
466   }
467 
468   /**
469    * Gets the counters for this job.
470    *
471    * @return the counters for this job.
472    * @throws IOException
473    */
getCounters()474   public Counters getCounters() throws IOException {
475     ensureState(JobState.RUNNING);
476     return new Counters(info.getCounters());
477   }
478 
ensureNotSet(String attr, String msg)479   private void ensureNotSet(String attr, String msg) throws IOException {
480     if (conf.get(attr) != null) {
481       throw new IOException(attr + " is incompatible with " + msg + " mode.");
482     }
483   }
484 
485   /**
486    * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
487    * tokens upon job completion. Defaults to true.
488    */
setCancelDelegationTokenUponJobCompletion(boolean value)489   public void setCancelDelegationTokenUponJobCompletion(boolean value) {
490     ensureState(JobState.DEFINE);
491     conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
492   }
493 
494   /**
495    * Default to the new APIs unless they are explicitly set or the old mapper or
496    * reduce attributes are used.
497    * @throws IOException if the configuration is inconsistant
498    */
setUseNewAPI()499   private void setUseNewAPI() throws IOException {
500     int numReduces = conf.getNumReduceTasks();
501     String oldMapperClass = "mapred.mapper.class";
502     String oldReduceClass = "mapred.reducer.class";
503     conf.setBooleanIfUnset("mapred.mapper.new-api",
504                            conf.get(oldMapperClass) == null);
505     if (conf.getUseNewMapper()) {
506       String mode = "new map API";
507       ensureNotSet("mapred.input.format.class", mode);
508       ensureNotSet(oldMapperClass, mode);
509       if (numReduces != 0) {
510         ensureNotSet("mapred.partitioner.class", mode);
511        } else {
512         ensureNotSet("mapred.output.format.class", mode);
513       }
514     } else {
515       String mode = "map compatability";
516       ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
517       ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
518       if (numReduces != 0) {
519         ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
520        } else {
521         ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
522       }
523     }
524     if (numReduces != 0) {
525       conf.setBooleanIfUnset("mapred.reducer.new-api",
526                              conf.get(oldReduceClass) == null);
527       if (conf.getUseNewReducer()) {
528         String mode = "new reduce API";
529         ensureNotSet("mapred.output.format.class", mode);
530         ensureNotSet(oldReduceClass, mode);
531       } else {
532         String mode = "reduce compatability";
533         ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
534         ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);
535       }
536     }
537   }
538 
539   /**
540    * Submit the job to the cluster and return immediately.
541    * @throws IOException
542    */
submit()543   public void submit() throws IOException, InterruptedException,
544                               ClassNotFoundException {
545     ensureState(JobState.DEFINE);
546     setUseNewAPI();
547 
548     // Connect to the JobTracker and submit the job
549     connect();
550     info = jobClient.submitJobInternal(conf);
551     super.setJobID(info.getID());
552     state = JobState.RUNNING;
553    }
554 
555   /**
556    * Open a connection to the JobTracker
557    * @throws IOException
558    * @throws InterruptedException
559    */
connect()560   private void connect() throws IOException, InterruptedException {
561     ugi.doAs(new PrivilegedExceptionAction<Object>() {
562       public Object run() throws IOException {
563         jobClient = new JobClient((JobConf) getConfiguration());
564         return null;
565       }
566     });
567   }
568 
569   /**
570    * Submit the job to the cluster and wait for it to finish.
571    * @param verbose print the progress to the user
572    * @return true if the job succeeded
573    * @throws IOException thrown if the communication with the
574    *         <code>JobTracker</code> is lost
575    */
waitForCompletion(boolean verbose )576   public boolean waitForCompletion(boolean verbose
577                                    ) throws IOException, InterruptedException,
578                                             ClassNotFoundException {
579     if (state == JobState.DEFINE) {
580       submit();
581     }
582     if (verbose) {
583       jobClient.monitorAndPrintJob(conf, info);
584     } else {
585       info.waitForCompletion();
586     }
587     return isSuccessful();
588   }
589 
590 }
591