1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce; 20 21 import java.io.IOException; 22 import java.security.PrivilegedExceptionAction; 23 24 import org.apache.hadoop.conf.Configuration; 25 import org.apache.hadoop.fs.Path; 26 import org.apache.hadoop.io.RawComparator; 27 import org.apache.hadoop.mapreduce.TaskAttemptID; 28 import org.apache.hadoop.mapred.JobClient; 29 import org.apache.hadoop.mapred.JobConf; 30 import org.apache.hadoop.mapred.RunningJob; 31 import org.apache.hadoop.mapred.TaskCompletionEvent; 32 33 /** 34 * The job submitter's view of the Job. It allows the user to configure the 35 * job, submit it, control its execution, and query the state. The set methods 36 * only work until the job is submitted, afterwards they will throw an 37 * IllegalStateException. 38 */ 39 public class Job extends JobContext { 40 public static enum JobState {DEFINE, RUNNING}; 41 private JobState state = JobState.DEFINE; 42 private JobClient jobClient; 43 private RunningJob info; 44 45 /** 46 * Creates a new {@link Job} 47 * A Job will be created with a generic {@link Configuration}. 48 * 49 * @return the {@link Job} 50 * @throws IOException 51 */ getInstance()52 public static Job getInstance() throws IOException { 53 // create with a null Cluster 54 return getInstance(new Configuration()); 55 } 56 57 /** 58 * Creates a new {@link Job} with a given {@link Configuration}. 59 * 60 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 61 * that any necessary internal modifications do not reflect on the incoming 62 * parameter. 63 * 64 * @param conf the {@link Configuration} 65 * @return the {@link Job} 66 * @throws IOException 67 */ getInstance(Configuration conf)68 public static Job getInstance(Configuration conf) throws IOException { 69 // create with a null Cluster 70 JobConf jobConf = new JobConf(conf); 71 return new Job(jobConf); 72 } 73 74 /** 75 * Creates a new {@link Job} with a given {@link Configuration} 76 * and a given jobName. 77 * 78 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 79 * that any necessary internal modifications do not reflect on the incoming 80 * parameter. 81 * 82 * @param conf the {@link Configuration} 83 * @param jobName the job instance's name 84 * @return the {@link Job} 85 * @throws IOException 86 */ getInstance(Configuration conf, String jobName)87 public static Job getInstance(Configuration conf, String jobName) 88 throws IOException { 89 // create with a null Cluster 90 Job result = getInstance(conf); 91 result.setJobName(jobName); 92 return result; 93 } 94 Job()95 public Job() throws IOException { 96 this(new Configuration()); 97 } 98 Job(Configuration conf)99 public Job(Configuration conf) throws IOException { 100 super(conf, null); 101 } 102 Job(Configuration conf, String jobName)103 public Job(Configuration conf, String jobName) throws IOException { 104 this(conf); 105 setJobName(jobName); 106 } 107 getJobClient()108 JobClient getJobClient() { 109 return jobClient; 110 } 111 ensureState(JobState state)112 private void ensureState(JobState state) throws IllegalStateException { 113 if (state != this.state) { 114 throw new IllegalStateException("Job in state "+ this.state + 115 " instead of " + state); 116 } 117 118 if (state == JobState.RUNNING && jobClient == null) { 119 throw new IllegalStateException("Job in state " + JobState.RUNNING + 120 " however jobClient is not initialized!"); 121 } 122 } 123 124 /** 125 * Set the number of reduce tasks for the job. 126 * @param tasks the number of reduce tasks 127 * @throws IllegalStateException if the job is submitted 128 */ setNumReduceTasks(int tasks)129 public void setNumReduceTasks(int tasks) throws IllegalStateException { 130 ensureState(JobState.DEFINE); 131 conf.setNumReduceTasks(tasks); 132 } 133 134 /** 135 * Set the current working directory for the default file system. 136 * 137 * @param dir the new current working directory. 138 * @throws IllegalStateException if the job is submitted 139 */ setWorkingDirectory(Path dir)140 public void setWorkingDirectory(Path dir) throws IOException { 141 ensureState(JobState.DEFINE); 142 conf.setWorkingDirectory(dir); 143 } 144 145 /** 146 * Set the {@link InputFormat} for the job. 147 * @param cls the <code>InputFormat</code> to use 148 * @throws IllegalStateException if the job is submitted 149 */ setInputFormatClass(Class<? extends InputFormat> cls )150 public void setInputFormatClass(Class<? extends InputFormat> cls 151 ) throws IllegalStateException { 152 ensureState(JobState.DEFINE); 153 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class); 154 } 155 156 /** 157 * Set the {@link OutputFormat} for the job. 158 * @param cls the <code>OutputFormat</code> to use 159 * @throws IllegalStateException if the job is submitted 160 */ setOutputFormatClass(Class<? extends OutputFormat> cls )161 public void setOutputFormatClass(Class<? extends OutputFormat> cls 162 ) throws IllegalStateException { 163 ensureState(JobState.DEFINE); 164 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class); 165 } 166 167 /** 168 * Set the {@link Mapper} for the job. 169 * @param cls the <code>Mapper</code> to use 170 * @throws IllegalStateException if the job is submitted 171 */ setMapperClass(Class<? extends Mapper> cls )172 public void setMapperClass(Class<? extends Mapper> cls 173 ) throws IllegalStateException { 174 ensureState(JobState.DEFINE); 175 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 176 } 177 178 /** 179 * Set the Jar by finding where a given class came from. 180 * @param cls the example class 181 */ setJarByClass(Class<?> cls)182 public void setJarByClass(Class<?> cls) { 183 conf.setJarByClass(cls); 184 } 185 186 /** 187 * Get the pathname of the job's jar. 188 * @return the pathname 189 */ getJar()190 public String getJar() { 191 return conf.getJar(); 192 } 193 194 /** 195 * Set the combiner class for the job. 196 * @param cls the combiner to use 197 * @throws IllegalStateException if the job is submitted 198 */ setCombinerClass(Class<? extends Reducer> cls )199 public void setCombinerClass(Class<? extends Reducer> cls 200 ) throws IllegalStateException { 201 ensureState(JobState.DEFINE); 202 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 203 } 204 205 /** 206 * Set the {@link Reducer} for the job. 207 * @param cls the <code>Reducer</code> to use 208 * @throws IllegalStateException if the job is submitted 209 */ setReducerClass(Class<? extends Reducer> cls )210 public void setReducerClass(Class<? extends Reducer> cls 211 ) throws IllegalStateException { 212 ensureState(JobState.DEFINE); 213 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 214 } 215 216 /** 217 * Set the {@link Partitioner} for the job. 218 * @param cls the <code>Partitioner</code> to use 219 * @throws IllegalStateException if the job is submitted 220 */ setPartitionerClass(Class<? extends Partitioner> cls )221 public void setPartitionerClass(Class<? extends Partitioner> cls 222 ) throws IllegalStateException { 223 ensureState(JobState.DEFINE); 224 conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class); 225 } 226 227 /** 228 * Set the key class for the map output data. This allows the user to 229 * specify the map output key class to be different than the final output 230 * value class. 231 * 232 * @param theClass the map output key class. 233 * @throws IllegalStateException if the job is submitted 234 */ setMapOutputKeyClass(Class<?> theClass )235 public void setMapOutputKeyClass(Class<?> theClass 236 ) throws IllegalStateException { 237 ensureState(JobState.DEFINE); 238 conf.setMapOutputKeyClass(theClass); 239 } 240 241 /** 242 * Set the value class for the map output data. This allows the user to 243 * specify the map output value class to be different than the final output 244 * value class. 245 * 246 * @param theClass the map output value class. 247 * @throws IllegalStateException if the job is submitted 248 */ setMapOutputValueClass(Class<?> theClass )249 public void setMapOutputValueClass(Class<?> theClass 250 ) throws IllegalStateException { 251 ensureState(JobState.DEFINE); 252 conf.setMapOutputValueClass(theClass); 253 } 254 255 /** 256 * Set the key class for the job output data. 257 * 258 * @param theClass the key class for the job output data. 259 * @throws IllegalStateException if the job is submitted 260 */ setOutputKeyClass(Class<?> theClass )261 public void setOutputKeyClass(Class<?> theClass 262 ) throws IllegalStateException { 263 ensureState(JobState.DEFINE); 264 conf.setOutputKeyClass(theClass); 265 } 266 267 /** 268 * Set the value class for job outputs. 269 * 270 * @param theClass the value class for job outputs. 271 * @throws IllegalStateException if the job is submitted 272 */ setOutputValueClass(Class<?> theClass )273 public void setOutputValueClass(Class<?> theClass 274 ) throws IllegalStateException { 275 ensureState(JobState.DEFINE); 276 conf.setOutputValueClass(theClass); 277 } 278 279 /** 280 * Define the comparator that controls how the keys are sorted before they 281 * are passed to the {@link Reducer}. 282 * @param cls the raw comparator 283 * @throws IllegalStateException if the job is submitted 284 */ setSortComparatorClass(Class<? extends RawComparator> cls )285 public void setSortComparatorClass(Class<? extends RawComparator> cls 286 ) throws IllegalStateException { 287 ensureState(JobState.DEFINE); 288 conf.setOutputKeyComparatorClass(cls); 289 } 290 291 /** 292 * Define the comparator that controls which keys are grouped together 293 * for a single call to 294 * {@link Reducer#reduce(Object, Iterable, 295 * org.apache.hadoop.mapreduce.Reducer.Context)} 296 * @param cls the raw comparator to use 297 * @throws IllegalStateException if the job is submitted 298 */ setGroupingComparatorClass(Class<? extends RawComparator> cls )299 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 300 ) throws IllegalStateException { 301 ensureState(JobState.DEFINE); 302 conf.setOutputValueGroupingComparator(cls); 303 } 304 305 /** 306 * Set the user-specified job name. 307 * 308 * @param name the job's new name. 309 * @throws IllegalStateException if the job is submitted 310 */ setJobName(String name)311 public void setJobName(String name) throws IllegalStateException { 312 ensureState(JobState.DEFINE); 313 conf.setJobName(name); 314 } 315 316 /** 317 * Turn speculative execution on or off for this job. 318 * 319 * @param speculativeExecution <code>true</code> if speculative execution 320 * should be turned on, else <code>false</code>. 321 */ setSpeculativeExecution(boolean speculativeExecution)322 public void setSpeculativeExecution(boolean speculativeExecution) { 323 ensureState(JobState.DEFINE); 324 conf.setSpeculativeExecution(speculativeExecution); 325 } 326 327 /** 328 * Turn speculative execution on or off for this job for map tasks. 329 * 330 * @param speculativeExecution <code>true</code> if speculative execution 331 * should be turned on for map tasks, 332 * else <code>false</code>. 333 */ setMapSpeculativeExecution(boolean speculativeExecution)334 public void setMapSpeculativeExecution(boolean speculativeExecution) { 335 ensureState(JobState.DEFINE); 336 conf.setMapSpeculativeExecution(speculativeExecution); 337 } 338 339 /** 340 * Turn speculative execution on or off for this job for reduce tasks. 341 * 342 * @param speculativeExecution <code>true</code> if speculative execution 343 * should be turned on for reduce tasks, 344 * else <code>false</code>. 345 */ setReduceSpeculativeExecution(boolean speculativeExecution)346 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 347 ensureState(JobState.DEFINE); 348 conf.setReduceSpeculativeExecution(speculativeExecution); 349 } 350 351 /** 352 * Get the URL where some job progress information will be displayed. 353 * 354 * @return the URL where some job progress information will be displayed. 355 */ getTrackingURL()356 public String getTrackingURL() { 357 ensureState(JobState.RUNNING); 358 return info.getTrackingURL(); 359 } 360 361 /** 362 * Get the <i>progress</i> of the job's setup, as a float between 0.0 363 * and 1.0. When the job setup is completed, the function returns 1.0. 364 * 365 * @return the progress of the job's setup. 366 * @throws IOException 367 */ setupProgress()368 public float setupProgress() throws IOException { 369 ensureState(JobState.RUNNING); 370 return info.setupProgress(); 371 } 372 373 /** 374 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 375 * and 1.0. When all map tasks have completed, the function returns 1.0. 376 * 377 * @return the progress of the job's map-tasks. 378 * @throws IOException 379 */ mapProgress()380 public float mapProgress() throws IOException { 381 ensureState(JobState.RUNNING); 382 return info.mapProgress(); 383 } 384 385 /** 386 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 387 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 388 * 389 * @return the progress of the job's reduce-tasks. 390 * @throws IOException 391 */ reduceProgress()392 public float reduceProgress() throws IOException { 393 ensureState(JobState.RUNNING); 394 return info.reduceProgress(); 395 } 396 397 /** 398 * Check if the job is finished or not. 399 * This is a non-blocking call. 400 * 401 * @return <code>true</code> if the job is complete, else <code>false</code>. 402 * @throws IOException 403 */ isComplete()404 public boolean isComplete() throws IOException { 405 ensureState(JobState.RUNNING); 406 return info.isComplete(); 407 } 408 409 /** 410 * Check if the job completed successfully. 411 * 412 * @return <code>true</code> if the job succeeded, else <code>false</code>. 413 * @throws IOException 414 */ isSuccessful()415 public boolean isSuccessful() throws IOException { 416 ensureState(JobState.RUNNING); 417 return info.isSuccessful(); 418 } 419 420 /** 421 * Kill the running job. Blocks until all job tasks have been 422 * killed as well. If the job is no longer running, it simply returns. 423 * 424 * @throws IOException 425 */ killJob()426 public void killJob() throws IOException { 427 ensureState(JobState.RUNNING); 428 info.killJob(); 429 } 430 431 /** 432 * Get events indicating completion (success/failure) of component tasks. 433 * 434 * @param startFrom index to start fetching events from 435 * @return an array of {@link TaskCompletionEvent}s 436 * @throws IOException 437 */ getTaskCompletionEvents(int startFrom )438 public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom 439 ) throws IOException { 440 ensureState(JobState.RUNNING); 441 return info.getTaskCompletionEvents(startFrom); 442 } 443 444 /** 445 * Kill indicated task attempt. 446 * 447 * @param taskId the id of the task to be terminated. 448 * @throws IOException 449 */ killTask(TaskAttemptID taskId)450 public void killTask(TaskAttemptID taskId) throws IOException { 451 ensureState(JobState.RUNNING); 452 info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 453 false); 454 } 455 456 /** 457 * Fail indicated task attempt. 458 * 459 * @param taskId the id of the task to be terminated. 460 * @throws IOException 461 */ failTask(TaskAttemptID taskId)462 public void failTask(TaskAttemptID taskId) throws IOException { 463 ensureState(JobState.RUNNING); 464 info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 465 true); 466 } 467 468 /** 469 * Gets the counters for this job. 470 * 471 * @return the counters for this job. 472 * @throws IOException 473 */ getCounters()474 public Counters getCounters() throws IOException { 475 ensureState(JobState.RUNNING); 476 return new Counters(info.getCounters()); 477 } 478 ensureNotSet(String attr, String msg)479 private void ensureNotSet(String attr, String msg) throws IOException { 480 if (conf.get(attr) != null) { 481 throw new IOException(attr + " is incompatible with " + msg + " mode."); 482 } 483 } 484 485 /** 486 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 487 * tokens upon job completion. Defaults to true. 488 */ setCancelDelegationTokenUponJobCompletion(boolean value)489 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 490 ensureState(JobState.DEFINE); 491 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 492 } 493 494 /** 495 * Default to the new APIs unless they are explicitly set or the old mapper or 496 * reduce attributes are used. 497 * @throws IOException if the configuration is inconsistant 498 */ setUseNewAPI()499 private void setUseNewAPI() throws IOException { 500 int numReduces = conf.getNumReduceTasks(); 501 String oldMapperClass = "mapred.mapper.class"; 502 String oldReduceClass = "mapred.reducer.class"; 503 conf.setBooleanIfUnset("mapred.mapper.new-api", 504 conf.get(oldMapperClass) == null); 505 if (conf.getUseNewMapper()) { 506 String mode = "new map API"; 507 ensureNotSet("mapred.input.format.class", mode); 508 ensureNotSet(oldMapperClass, mode); 509 if (numReduces != 0) { 510 ensureNotSet("mapred.partitioner.class", mode); 511 } else { 512 ensureNotSet("mapred.output.format.class", mode); 513 } 514 } else { 515 String mode = "map compatability"; 516 ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode); 517 ensureNotSet(JobContext.MAP_CLASS_ATTR, mode); 518 if (numReduces != 0) { 519 ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode); 520 } else { 521 ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode); 522 } 523 } 524 if (numReduces != 0) { 525 conf.setBooleanIfUnset("mapred.reducer.new-api", 526 conf.get(oldReduceClass) == null); 527 if (conf.getUseNewReducer()) { 528 String mode = "new reduce API"; 529 ensureNotSet("mapred.output.format.class", mode); 530 ensureNotSet(oldReduceClass, mode); 531 } else { 532 String mode = "reduce compatability"; 533 ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode); 534 ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode); 535 } 536 } 537 } 538 539 /** 540 * Submit the job to the cluster and return immediately. 541 * @throws IOException 542 */ submit()543 public void submit() throws IOException, InterruptedException, 544 ClassNotFoundException { 545 ensureState(JobState.DEFINE); 546 setUseNewAPI(); 547 548 // Connect to the JobTracker and submit the job 549 connect(); 550 info = jobClient.submitJobInternal(conf); 551 super.setJobID(info.getID()); 552 state = JobState.RUNNING; 553 } 554 555 /** 556 * Open a connection to the JobTracker 557 * @throws IOException 558 * @throws InterruptedException 559 */ connect()560 private void connect() throws IOException, InterruptedException { 561 ugi.doAs(new PrivilegedExceptionAction<Object>() { 562 public Object run() throws IOException { 563 jobClient = new JobClient((JobConf) getConfiguration()); 564 return null; 565 } 566 }); 567 } 568 569 /** 570 * Submit the job to the cluster and wait for it to finish. 571 * @param verbose print the progress to the user 572 * @return true if the job succeeded 573 * @throws IOException thrown if the communication with the 574 * <code>JobTracker</code> is lost 575 */ waitForCompletion(boolean verbose )576 public boolean waitForCompletion(boolean verbose 577 ) throws IOException, InterruptedException, 578 ClassNotFoundException { 579 if (state == JobState.DEFINE) { 580 submit(); 581 } 582 if (verbose) { 583 jobClient.monitorAndPrintJob(conf, info); 584 } else { 585 info.waitForCompletion(); 586 } 587 return isSuccessful(); 588 } 589 590 } 591