1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.lib.jobcontrol; 20 21 22 import java.io.IOException; 23 import java.util.ArrayList; 24 import java.util.List; 25 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 import org.apache.hadoop.classification.InterfaceAudience; 29 import org.apache.hadoop.classification.InterfaceStability; 30 import org.apache.hadoop.conf.Configuration; 31 import org.apache.hadoop.fs.FileSystem; 32 import org.apache.hadoop.fs.Path; 33 import org.apache.hadoop.mapreduce.Job; 34 import org.apache.hadoop.mapreduce.JobID; 35 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 36 import org.apache.hadoop.util.StringUtils; 37 38 /** 39 * This class encapsulates a MapReduce job and its dependency. It monitors 40 * the states of the depending jobs and updates the state of this job. 41 * A job starts in the WAITING state. If it does not have any depending jobs, 42 * or all of the depending jobs are in SUCCESS state, then the job state 43 * will become READY. If any depending jobs fail, the job will fail too. 44 * When in READY state, the job can be submitted to Hadoop for execution, with 45 * the state changing into RUNNING state. From RUNNING state, the job 46 * can get into SUCCESS or FAILED state, depending 47 * the status of the job execution. 48 */ 49 @InterfaceAudience.Public 50 @InterfaceStability.Evolving 51 public class ControlledJob { 52 private static final Log LOG = LogFactory.getLog(ControlledJob.class); 53 54 // A job will be in one of the following states 55 public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED, 56 DEPENDENT_FAILED}; 57 public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; 58 private State state; 59 private String controlID; // assigned and used by JobControl class 60 private Job job; // mapreduce job to be executed. 61 // some info for human consumption, e.g. the reason why the job failed 62 private String message; 63 // the jobs the current job depends on 64 private List<ControlledJob> dependingJobs; 65 66 /** 67 * Construct a job. 68 * @param job a mapreduce job to be executed. 69 * @param dependingJobs an array of jobs the current job depends on 70 */ ControlledJob(Job job, List<ControlledJob> dependingJobs)71 public ControlledJob(Job job, List<ControlledJob> dependingJobs) 72 throws IOException { 73 this.job = job; 74 this.dependingJobs = dependingJobs; 75 this.state = State.WAITING; 76 this.controlID = "unassigned"; 77 this.message = "just initialized"; 78 } 79 80 /** 81 * Construct a job. 82 * 83 * @param conf mapred job configuration representing a job to be executed. 84 * @throws IOException 85 */ ControlledJob(Configuration conf)86 public ControlledJob(Configuration conf) throws IOException { 87 this(Job.getInstance(conf), null); 88 } 89 90 @Override toString()91 public String toString() { 92 StringBuffer sb = new StringBuffer(); 93 sb.append("job name:\t").append(this.job.getJobName()).append("\n"); 94 sb.append("job id:\t").append(this.controlID).append("\n"); 95 sb.append("job state:\t").append(this.state).append("\n"); 96 sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n"); 97 sb.append("job message:\t").append(this.message).append("\n"); 98 99 if (this.dependingJobs == null || this.dependingJobs.size() == 0) { 100 sb.append("job has no depending job:\t").append("\n"); 101 } else { 102 sb.append("job has ").append(this.dependingJobs.size()). 103 append(" dependeng jobs:\n"); 104 for (int i = 0; i < this.dependingJobs.size(); i++) { 105 sb.append("\t depending job ").append(i).append(":\t"); 106 sb.append((this.dependingJobs.get(i)).getJobName()).append("\n"); 107 } 108 } 109 return sb.toString(); 110 } 111 112 /** 113 * @return the job name of this job 114 */ getJobName()115 public String getJobName() { 116 return job.getJobName(); 117 } 118 119 /** 120 * Set the job name for this job. 121 * @param jobName the job name 122 */ setJobName(String jobName)123 public void setJobName(String jobName) { 124 job.setJobName(jobName); 125 } 126 127 /** 128 * @return the job ID of this job assigned by JobControl 129 */ getJobID()130 public String getJobID() { 131 return this.controlID; 132 } 133 134 /** 135 * Set the job ID for this job. 136 * @param id the job ID 137 */ setJobID(String id)138 public void setJobID(String id) { 139 this.controlID = id; 140 } 141 142 /** 143 * @return the mapred ID of this job as assigned by the mapred framework. 144 */ getMapredJobId()145 public synchronized JobID getMapredJobId() { 146 return this.job.getJobID(); 147 } 148 149 /** 150 * @return the mapreduce job 151 */ getJob()152 public synchronized Job getJob() { 153 return this.job; 154 } 155 156 /** 157 * Set the mapreduce job 158 * @param job the mapreduce job for this job. 159 */ setJob(Job job)160 public synchronized void setJob(Job job) { 161 this.job = job; 162 } 163 164 /** 165 * @return the state of this job 166 */ getJobState()167 public synchronized State getJobState() { 168 return this.state; 169 } 170 171 /** 172 * Set the state for this job. 173 * @param state the new state for this job. 174 */ setJobState(State state)175 protected synchronized void setJobState(State state) { 176 this.state = state; 177 } 178 179 /** 180 * @return the message of this job 181 */ getMessage()182 public synchronized String getMessage() { 183 return this.message; 184 } 185 186 /** 187 * Set the message for this job. 188 * @param message the message for this job. 189 */ setMessage(String message)190 public synchronized void setMessage(String message) { 191 this.message = message; 192 } 193 194 /** 195 * @return the depending jobs of this job 196 */ getDependentJobs()197 public List<ControlledJob> getDependentJobs() { 198 return this.dependingJobs; 199 } 200 201 /** 202 * Add a job to this jobs' dependency list. 203 * Dependent jobs can only be added while a Job 204 * is waiting to run, not during or afterwards. 205 * 206 * @param dependingJob Job that this Job depends on. 207 * @return <tt>true</tt> if the Job was added. 208 */ addDependingJob(ControlledJob dependingJob)209 public synchronized boolean addDependingJob(ControlledJob dependingJob) { 210 if (this.state == State.WAITING) { //only allowed to add jobs when waiting 211 if (this.dependingJobs == null) { 212 this.dependingJobs = new ArrayList<ControlledJob>(); 213 } 214 return this.dependingJobs.add(dependingJob); 215 } else { 216 return false; 217 } 218 } 219 220 /** 221 * @return true if this job is in a complete state 222 */ isCompleted()223 public synchronized boolean isCompleted() { 224 return this.state == State.FAILED || 225 this.state == State.DEPENDENT_FAILED || 226 this.state == State.SUCCESS; 227 } 228 229 /** 230 * @return true if this job is in READY state 231 */ isReady()232 public synchronized boolean isReady() { 233 return this.state == State.READY; 234 } 235 killJob()236 public void killJob() throws IOException, InterruptedException { 237 job.killJob(); 238 } 239 failJob(String message)240 public synchronized void failJob(String message) throws IOException, InterruptedException { 241 try { 242 if(job != null && this.state == State.RUNNING) { 243 job.killJob(); 244 } 245 } finally { 246 this.state = State.FAILED; 247 this.message = message; 248 } 249 } 250 251 /** 252 * Check the state of this running job. The state may 253 * remain the same, become SUCCESS or FAILED. 254 */ checkRunningState()255 private void checkRunningState() throws IOException, InterruptedException { 256 try { 257 if (job.isComplete()) { 258 if (job.isSuccessful()) { 259 this.state = State.SUCCESS; 260 } else { 261 this.state = State.FAILED; 262 this.message = "Job failed!"; 263 } 264 } 265 } catch (IOException ioe) { 266 this.state = State.FAILED; 267 this.message = StringUtils.stringifyException(ioe); 268 try { 269 if (job != null) { 270 job.killJob(); 271 } 272 } catch (IOException e) {} 273 } 274 } 275 276 /** 277 * Check and update the state of this job. The state changes 278 * depending on its current state and the states of the depending jobs. 279 */ checkState()280 synchronized State checkState() throws IOException, InterruptedException { 281 if (this.state == State.RUNNING) { 282 checkRunningState(); 283 } 284 if (this.state != State.WAITING) { 285 return this.state; 286 } 287 if (this.dependingJobs == null || this.dependingJobs.size() == 0) { 288 this.state = State.READY; 289 return this.state; 290 } 291 ControlledJob pred = null; 292 int n = this.dependingJobs.size(); 293 for (int i = 0; i < n; i++) { 294 pred = this.dependingJobs.get(i); 295 State s = pred.checkState(); 296 if (s == State.WAITING || s == State.READY || s == State.RUNNING) { 297 break; // a pred is still not completed, continue in WAITING 298 // state 299 } 300 if (s == State.FAILED || s == State.DEPENDENT_FAILED) { 301 this.state = State.DEPENDENT_FAILED; 302 this.message = "depending job " + i + " with jobID " 303 + pred.getJobID() + " failed. " + pred.getMessage(); 304 break; 305 } 306 // pred must be in success state 307 if (i == n - 1) { 308 this.state = State.READY; 309 } 310 } 311 312 return this.state; 313 } 314 315 /** 316 * Submit this job to mapred. The state becomes RUNNING if submission 317 * is successful, FAILED otherwise. 318 */ submit()319 protected synchronized void submit() { 320 try { 321 Configuration conf = job.getConfiguration(); 322 if (conf.getBoolean(CREATE_DIR, false)) { 323 FileSystem fs = FileSystem.get(conf); 324 Path inputPaths[] = FileInputFormat.getInputPaths(job); 325 for (int i = 0; i < inputPaths.length; i++) { 326 if (!fs.exists(inputPaths[i])) { 327 try { 328 fs.mkdirs(inputPaths[i]); 329 } catch (IOException e) { 330 331 } 332 } 333 } 334 } 335 job.submit(); 336 this.state = State.RUNNING; 337 } catch (Exception ioe) { 338 LOG.info(getJobName()+" got an error while submitting ",ioe); 339 this.state = State.FAILED; 340 this.message = StringUtils.stringifyException(ioe); 341 } 342 } 343 344 } 345