1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce.lib.jobcontrol;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27 
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.classification.InterfaceStability;
32 import org.apache.hadoop.mapred.jobcontrol.Job;
33 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
34 import org.apache.hadoop.util.StringUtils;
35 
36 /**
37  *  This class encapsulates a set of MapReduce jobs and its dependency.
38  *
39  *  It tracks the states of the jobs by placing them into different tables
40  *  according to their states.
41  *
42  *  This class provides APIs for the client app to add a job to the group
43  *  and to get the jobs in the group in different states. When a job is
44  *  added, an ID unique to the group is assigned to the job.
45  *
46  *  This class has a thread that submits jobs when they become ready,
47  *  monitors the states of the running jobs, and updates the states of jobs
48  *  based on the state changes of their depending jobs states. The class
49  *  provides APIs for suspending/resuming the thread, and
50  *  for stopping the thread.
51  *
52  */
53 @InterfaceAudience.Public
54 @InterfaceStability.Evolving
55 public class JobControl implements Runnable {
56   private static final Log LOG = LogFactory.getLog(JobControl.class);
57 
58   // The thread can be in one of the following state
59   public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
60 
61   private ThreadState runnerState;			// the thread state
62 
63   private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
64   private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
65   private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
66 
67   private long nextJobID;
68   private String groupName;
69 
70   /**
71    * Construct a job control for a group of jobs.
72    * @param groupName a name identifying this group
73    */
JobControl(String groupName)74   public JobControl(String groupName) {
75     this.nextJobID = -1;
76     this.groupName = groupName;
77     this.runnerState = ThreadState.READY;
78   }
79 
toList( LinkedList<ControlledJob> jobs)80   private static List<ControlledJob> toList(
81                    LinkedList<ControlledJob> jobs) {
82     ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
83     for (ControlledJob job : jobs) {
84       retv.add(job);
85     }
86     return retv;
87   }
88 
getJobsIn(State state)89   synchronized private List<ControlledJob> getJobsIn(State state) {
90     LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
91     for(ControlledJob j: jobsInProgress) {
92       if(j.getJobState() == state) {
93         l.add(j);
94       }
95     }
96     return l;
97   }
98 
99   /**
100    * @return the jobs in the waiting state
101    */
getWaitingJobList()102   public List<ControlledJob> getWaitingJobList() {
103     return getJobsIn(State.WAITING);
104   }
105 
106   /**
107    * @return the jobs in the running state
108    */
getRunningJobList()109   public List<ControlledJob> getRunningJobList() {
110     return getJobsIn(State.RUNNING);
111   }
112 
113   /**
114    * @return the jobs in the ready state
115    */
getReadyJobsList()116   public List<ControlledJob> getReadyJobsList() {
117     return getJobsIn(State.READY);
118   }
119 
120   /**
121    * @return the jobs in the success state
122    */
getSuccessfulJobList()123   synchronized public List<ControlledJob> getSuccessfulJobList() {
124     return toList(this.successfulJobs);
125   }
126 
getFailedJobList()127   synchronized public List<ControlledJob> getFailedJobList() {
128     return toList(this.failedJobs);
129   }
130 
getNextJobID()131   private String getNextJobID() {
132     nextJobID += 1;
133     return this.groupName + this.nextJobID;
134   }
135 
136   /**
137    * Add a new controlled job.
138    * @param aJob the new controlled job
139    */
addJob(ControlledJob aJob)140   synchronized public String addJob(ControlledJob aJob) {
141     String id = this.getNextJobID();
142     aJob.setJobID(id);
143     aJob.setJobState(State.WAITING);
144     jobsInProgress.add(aJob);
145     return id;
146   }
147 
148   /**
149    * Add a new job.
150    * @param aJob the new job
151    */
addJob(Job aJob)152   synchronized public String addJob(Job aJob) {
153     return addJob((ControlledJob) aJob);
154   }
155 
156   /**
157    * Add a collection of jobs
158    *
159    * @param jobs
160    */
addJobCollection(Collection<ControlledJob> jobs)161   public void addJobCollection(Collection<ControlledJob> jobs) {
162     for (ControlledJob job : jobs) {
163       addJob(job);
164     }
165   }
166 
167   /**
168    * @return the thread state
169    */
getThreadState()170   public ThreadState getThreadState() {
171     return this.runnerState;
172   }
173 
174   /**
175    * set the thread state to STOPPING so that the
176    * thread will stop when it wakes up.
177    */
stop()178   public void stop() {
179     this.runnerState = ThreadState.STOPPING;
180   }
181 
182   /**
183    * suspend the running thread
184    */
suspend()185   public void suspend () {
186     if (this.runnerState == ThreadState.RUNNING) {
187       this.runnerState = ThreadState.SUSPENDED;
188     }
189   }
190 
191   /**
192    * resume the suspended thread
193    */
resume()194   public void resume () {
195     if (this.runnerState == ThreadState.SUSPENDED) {
196       this.runnerState = ThreadState.RUNNING;
197     }
198   }
199 
allFinished()200   synchronized public boolean allFinished() {
201     return jobsInProgress.isEmpty();
202   }
203 
204   /**
205    *  The main loop for the thread.
206    *  The loop does the following:
207    *  	Check the states of the running jobs
208    *  	Update the states of waiting jobs
209    *  	Submit the jobs in ready state
210    */
run()211   public void run() {
212     try {
213       this.runnerState = ThreadState.RUNNING;
214       while (true) {
215         while (this.runnerState == ThreadState.SUSPENDED) {
216           try {
217             Thread.sleep(5000);
218           }
219           catch (Exception e) {
220             //TODO the thread was interrupted, do something!!!
221           }
222         }
223 
224         synchronized(this) {
225           Iterator<ControlledJob> it = jobsInProgress.iterator();
226           while(it.hasNext()) {
227             ControlledJob j = it.next();
228             LOG.debug("Checking state of job "+j);
229             switch(j.checkState()) {
230             case SUCCESS:
231               successfulJobs.add(j);
232               it.remove();
233               break;
234             case FAILED:
235             case DEPENDENT_FAILED:
236               failedJobs.add(j);
237               it.remove();
238               break;
239             case READY:
240               j.submit();
241               break;
242             case RUNNING:
243             case WAITING:
244               //Do Nothing
245               break;
246             }
247           }
248         }
249 
250         if (this.runnerState != ThreadState.RUNNING &&
251             this.runnerState != ThreadState.SUSPENDED) {
252           break;
253         }
254         try {
255           Thread.sleep(5000);
256         }
257         catch (Exception e) {
258           //TODO the thread was interrupted, do something!!!
259         }
260         if (this.runnerState != ThreadState.RUNNING &&
261             this.runnerState != ThreadState.SUSPENDED) {
262           break;
263         }
264       }
265     }catch(Throwable t) {
266       LOG.error("Error while trying to run jobs.",t);
267       //Mark all jobs as failed because we got something bad.
268       failAllJobs(t);
269     }
270     this.runnerState = ThreadState.STOPPED;
271   }
272 
failAllJobs(Throwable t)273   synchronized private void failAllJobs(Throwable t) {
274     String message = "Unexpected System Error Occured: "+
275     StringUtils.stringifyException(t);
276     Iterator<ControlledJob> it = jobsInProgress.iterator();
277     while(it.hasNext()) {
278       ControlledJob j = it.next();
279       try {
280         j.failJob(message);
281       } catch (IOException e) {
282         LOG.error("Error while tyring to clean up "+j.getJobName(), e);
283       } catch (InterruptedException e) {
284         LOG.error("Error while tyring to clean up "+j.getJobName(), e);
285       } finally {
286         failedJobs.add(j);
287         it.remove();
288       }
289     }
290   }
291 }
292