1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce.lib.jobcontrol;
20 
21 
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.mapreduce.Job;
34 import org.apache.hadoop.mapreduce.JobID;
35 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
36 import org.apache.hadoop.util.StringUtils;
37 
38 /**
39  *  This class encapsulates a MapReduce job and its dependency. It monitors
40  *  the states of the depending jobs and updates the state of this job.
41  *  A job starts in the WAITING state. If it does not have any depending jobs,
42  *  or all of the depending jobs are in SUCCESS state, then the job state
43  *  will become READY. If any depending jobs fail, the job will fail too.
44  *  When in READY state, the job can be submitted to Hadoop for execution, with
45  *  the state changing into RUNNING state. From RUNNING state, the job
46  *  can get into SUCCESS or FAILED state, depending
47  *  the status of the job execution.
48  */
49 @InterfaceAudience.Public
50 @InterfaceStability.Evolving
51 public class ControlledJob {
52   private static final Log LOG = LogFactory.getLog(ControlledJob.class);
53 
54   // A job will be in one of the following states
55   public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
56                             DEPENDENT_FAILED};
57   public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
58   private State state;
59   private String controlID;     // assigned and used by JobControl class
60   private Job job;               // mapreduce job to be executed.
61   // some info for human consumption, e.g. the reason why the job failed
62   private String message;
63   // the jobs the current job depends on
64   private List<ControlledJob> dependingJobs;
65 
66   /**
67    * Construct a job.
68    * @param job a mapreduce job to be executed.
69    * @param dependingJobs an array of jobs the current job depends on
70    */
ControlledJob(Job job, List<ControlledJob> dependingJobs)71   public ControlledJob(Job job, List<ControlledJob> dependingJobs)
72       throws IOException {
73     this.job = job;
74     this.dependingJobs = dependingJobs;
75     this.state = State.WAITING;
76     this.controlID = "unassigned";
77     this.message = "just initialized";
78   }
79 
80   /**
81    * Construct a job.
82    *
83    * @param conf mapred job configuration representing a job to be executed.
84    * @throws IOException
85    */
ControlledJob(Configuration conf)86   public ControlledJob(Configuration conf) throws IOException {
87     this(Job.getInstance(conf), null);
88   }
89 
90   @Override
toString()91   public String toString() {
92     StringBuffer sb = new StringBuffer();
93     sb.append("job name:\t").append(this.job.getJobName()).append("\n");
94     sb.append("job id:\t").append(this.controlID).append("\n");
95     sb.append("job state:\t").append(this.state).append("\n");
96     sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
97     sb.append("job message:\t").append(this.message).append("\n");
98 
99     if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
100       sb.append("job has no depending job:\t").append("\n");
101     } else {
102       sb.append("job has ").append(this.dependingJobs.size()).
103          append(" dependeng jobs:\n");
104       for (int i = 0; i < this.dependingJobs.size(); i++) {
105         sb.append("\t depending job ").append(i).append(":\t");
106         sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
107       }
108     }
109     return sb.toString();
110   }
111 
112   /**
113    * @return the job name of this job
114    */
getJobName()115   public String getJobName() {
116     return job.getJobName();
117   }
118 
119   /**
120    * Set the job name for  this job.
121    * @param jobName the job name
122    */
setJobName(String jobName)123   public void setJobName(String jobName) {
124     job.setJobName(jobName);
125   }
126 
127   /**
128    * @return the job ID of this job assigned by JobControl
129    */
getJobID()130   public String getJobID() {
131     return this.controlID;
132   }
133 
134   /**
135    * Set the job ID for  this job.
136    * @param id the job ID
137    */
setJobID(String id)138   public void setJobID(String id) {
139     this.controlID = id;
140   }
141 
142   /**
143    * @return the mapred ID of this job as assigned by the mapred framework.
144    */
getMapredJobId()145   public synchronized JobID getMapredJobId() {
146     return this.job.getJobID();
147   }
148 
149   /**
150    * @return the mapreduce job
151    */
getJob()152   public synchronized Job getJob() {
153     return this.job;
154   }
155 
156   /**
157    * Set the mapreduce job
158    * @param job the mapreduce job for this job.
159    */
setJob(Job job)160   public synchronized void setJob(Job job) {
161     this.job = job;
162   }
163 
164   /**
165    * @return the state of this job
166    */
getJobState()167   public synchronized State getJobState() {
168     return this.state;
169   }
170 
171   /**
172    * Set the state for this job.
173    * @param state the new state for this job.
174    */
setJobState(State state)175   protected synchronized void setJobState(State state) {
176     this.state = state;
177   }
178 
179   /**
180    * @return the message of this job
181    */
getMessage()182   public synchronized String getMessage() {
183     return this.message;
184   }
185 
186   /**
187    * Set the message for this job.
188    * @param message the message for this job.
189    */
setMessage(String message)190   public synchronized void setMessage(String message) {
191     this.message = message;
192   }
193 
194   /**
195    * @return the depending jobs of this job
196    */
getDependentJobs()197   public List<ControlledJob> getDependentJobs() {
198     return this.dependingJobs;
199   }
200 
201   /**
202    * Add a job to this jobs' dependency list.
203    * Dependent jobs can only be added while a Job
204    * is waiting to run, not during or afterwards.
205    *
206    * @param dependingJob Job that this Job depends on.
207    * @return <tt>true</tt> if the Job was added.
208    */
addDependingJob(ControlledJob dependingJob)209   public synchronized boolean addDependingJob(ControlledJob dependingJob) {
210     if (this.state == State.WAITING) { //only allowed to add jobs when waiting
211       if (this.dependingJobs == null) {
212         this.dependingJobs = new ArrayList<ControlledJob>();
213       }
214       return this.dependingJobs.add(dependingJob);
215     } else {
216       return false;
217     }
218   }
219 
220   /**
221    * @return true if this job is in a complete state
222    */
isCompleted()223   public synchronized boolean isCompleted() {
224     return this.state == State.FAILED ||
225       this.state == State.DEPENDENT_FAILED ||
226       this.state == State.SUCCESS;
227   }
228 
229   /**
230    * @return true if this job is in READY state
231    */
isReady()232   public synchronized boolean isReady() {
233     return this.state == State.READY;
234   }
235 
killJob()236   public void killJob() throws IOException, InterruptedException {
237     job.killJob();
238   }
239 
failJob(String message)240   public synchronized void failJob(String message) throws IOException, InterruptedException {
241     try {
242       if(job != null && this.state == State.RUNNING) {
243         job.killJob();
244       }
245     } finally {
246       this.state = State.FAILED;
247       this.message = message;
248     }
249   }
250 
251   /**
252    * Check the state of this running job. The state may
253    * remain the same, become SUCCESS or FAILED.
254    */
checkRunningState()255   private void checkRunningState() throws IOException, InterruptedException {
256     try {
257       if (job.isComplete()) {
258         if (job.isSuccessful()) {
259           this.state = State.SUCCESS;
260         } else {
261           this.state = State.FAILED;
262           this.message = "Job failed!";
263         }
264       }
265     } catch (IOException ioe) {
266       this.state = State.FAILED;
267       this.message = StringUtils.stringifyException(ioe);
268       try {
269         if (job != null) {
270           job.killJob();
271         }
272       } catch (IOException e) {}
273     }
274   }
275 
276   /**
277    * Check and update the state of this job. The state changes
278    * depending on its current state and the states of the depending jobs.
279    */
checkState()280    synchronized State checkState() throws IOException, InterruptedException {
281     if (this.state == State.RUNNING) {
282       checkRunningState();
283     }
284     if (this.state != State.WAITING) {
285       return this.state;
286     }
287     if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
288       this.state = State.READY;
289       return this.state;
290     }
291     ControlledJob pred = null;
292     int n = this.dependingJobs.size();
293     for (int i = 0; i < n; i++) {
294       pred = this.dependingJobs.get(i);
295       State s = pred.checkState();
296       if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
297         break; // a pred is still not completed, continue in WAITING
298         // state
299       }
300       if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
301         this.state = State.DEPENDENT_FAILED;
302         this.message = "depending job " + i + " with jobID "
303           + pred.getJobID() + " failed. " + pred.getMessage();
304         break;
305       }
306       // pred must be in success state
307       if (i == n - 1) {
308         this.state = State.READY;
309       }
310     }
311 
312     return this.state;
313   }
314 
315   /**
316    * Submit this job to mapred. The state becomes RUNNING if submission
317    * is successful, FAILED otherwise.
318    */
submit()319   protected synchronized void submit() {
320     try {
321       Configuration conf = job.getConfiguration();
322       if (conf.getBoolean(CREATE_DIR, false)) {
323         FileSystem fs = FileSystem.get(conf);
324         Path inputPaths[] = FileInputFormat.getInputPaths(job);
325         for (int i = 0; i < inputPaths.length; i++) {
326           if (!fs.exists(inputPaths[i])) {
327             try {
328               fs.mkdirs(inputPaths[i]);
329             } catch (IOException e) {
330 
331             }
332           }
333         }
334       }
335       job.submit();
336       this.state = State.RUNNING;
337     } catch (Exception ioe) {
338       LOG.info(getJobName()+" got an error while submitting ",ioe);
339       this.state = State.FAILED;
340       this.message = StringUtils.stringifyException(ioe);
341     }
342   }
343 
344 }
345