1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.lib.jobcontrol; 20 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.Collection; 24 import java.util.Iterator; 25 import java.util.LinkedList; 26 import java.util.List; 27 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 import org.apache.hadoop.classification.InterfaceAudience; 31 import org.apache.hadoop.classification.InterfaceStability; 32 import org.apache.hadoop.mapred.jobcontrol.Job; 33 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; 34 import org.apache.hadoop.util.StringUtils; 35 36 /** 37 * This class encapsulates a set of MapReduce jobs and its dependency. 38 * 39 * It tracks the states of the jobs by placing them into different tables 40 * according to their states. 41 * 42 * This class provides APIs for the client app to add a job to the group 43 * and to get the jobs in the group in different states. When a job is 44 * added, an ID unique to the group is assigned to the job. 45 * 46 * This class has a thread that submits jobs when they become ready, 47 * monitors the states of the running jobs, and updates the states of jobs 48 * based on the state changes of their depending jobs states. The class 49 * provides APIs for suspending/resuming the thread, and 50 * for stopping the thread. 51 * 52 */ 53 @InterfaceAudience.Public 54 @InterfaceStability.Evolving 55 public class JobControl implements Runnable { 56 private static final Log LOG = LogFactory.getLog(JobControl.class); 57 58 // The thread can be in one of the following state 59 public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY}; 60 61 private ThreadState runnerState; // the thread state 62 63 private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>(); 64 private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>(); 65 private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>(); 66 67 private long nextJobID; 68 private String groupName; 69 70 /** 71 * Construct a job control for a group of jobs. 72 * @param groupName a name identifying this group 73 */ JobControl(String groupName)74 public JobControl(String groupName) { 75 this.nextJobID = -1; 76 this.groupName = groupName; 77 this.runnerState = ThreadState.READY; 78 } 79 toList( LinkedList<ControlledJob> jobs)80 private static List<ControlledJob> toList( 81 LinkedList<ControlledJob> jobs) { 82 ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>(); 83 for (ControlledJob job : jobs) { 84 retv.add(job); 85 } 86 return retv; 87 } 88 getJobsIn(State state)89 synchronized private List<ControlledJob> getJobsIn(State state) { 90 LinkedList<ControlledJob> l = new LinkedList<ControlledJob>(); 91 for(ControlledJob j: jobsInProgress) { 92 if(j.getJobState() == state) { 93 l.add(j); 94 } 95 } 96 return l; 97 } 98 99 /** 100 * @return the jobs in the waiting state 101 */ getWaitingJobList()102 public List<ControlledJob> getWaitingJobList() { 103 return getJobsIn(State.WAITING); 104 } 105 106 /** 107 * @return the jobs in the running state 108 */ getRunningJobList()109 public List<ControlledJob> getRunningJobList() { 110 return getJobsIn(State.RUNNING); 111 } 112 113 /** 114 * @return the jobs in the ready state 115 */ getReadyJobsList()116 public List<ControlledJob> getReadyJobsList() { 117 return getJobsIn(State.READY); 118 } 119 120 /** 121 * @return the jobs in the success state 122 */ getSuccessfulJobList()123 synchronized public List<ControlledJob> getSuccessfulJobList() { 124 return toList(this.successfulJobs); 125 } 126 getFailedJobList()127 synchronized public List<ControlledJob> getFailedJobList() { 128 return toList(this.failedJobs); 129 } 130 getNextJobID()131 private String getNextJobID() { 132 nextJobID += 1; 133 return this.groupName + this.nextJobID; 134 } 135 136 /** 137 * Add a new controlled job. 138 * @param aJob the new controlled job 139 */ addJob(ControlledJob aJob)140 synchronized public String addJob(ControlledJob aJob) { 141 String id = this.getNextJobID(); 142 aJob.setJobID(id); 143 aJob.setJobState(State.WAITING); 144 jobsInProgress.add(aJob); 145 return id; 146 } 147 148 /** 149 * Add a new job. 150 * @param aJob the new job 151 */ addJob(Job aJob)152 synchronized public String addJob(Job aJob) { 153 return addJob((ControlledJob) aJob); 154 } 155 156 /** 157 * Add a collection of jobs 158 * 159 * @param jobs 160 */ addJobCollection(Collection<ControlledJob> jobs)161 public void addJobCollection(Collection<ControlledJob> jobs) { 162 for (ControlledJob job : jobs) { 163 addJob(job); 164 } 165 } 166 167 /** 168 * @return the thread state 169 */ getThreadState()170 public ThreadState getThreadState() { 171 return this.runnerState; 172 } 173 174 /** 175 * set the thread state to STOPPING so that the 176 * thread will stop when it wakes up. 177 */ stop()178 public void stop() { 179 this.runnerState = ThreadState.STOPPING; 180 } 181 182 /** 183 * suspend the running thread 184 */ suspend()185 public void suspend () { 186 if (this.runnerState == ThreadState.RUNNING) { 187 this.runnerState = ThreadState.SUSPENDED; 188 } 189 } 190 191 /** 192 * resume the suspended thread 193 */ resume()194 public void resume () { 195 if (this.runnerState == ThreadState.SUSPENDED) { 196 this.runnerState = ThreadState.RUNNING; 197 } 198 } 199 allFinished()200 synchronized public boolean allFinished() { 201 return jobsInProgress.isEmpty(); 202 } 203 204 /** 205 * The main loop for the thread. 206 * The loop does the following: 207 * Check the states of the running jobs 208 * Update the states of waiting jobs 209 * Submit the jobs in ready state 210 */ run()211 public void run() { 212 try { 213 this.runnerState = ThreadState.RUNNING; 214 while (true) { 215 while (this.runnerState == ThreadState.SUSPENDED) { 216 try { 217 Thread.sleep(5000); 218 } 219 catch (Exception e) { 220 //TODO the thread was interrupted, do something!!! 221 } 222 } 223 224 synchronized(this) { 225 Iterator<ControlledJob> it = jobsInProgress.iterator(); 226 while(it.hasNext()) { 227 ControlledJob j = it.next(); 228 LOG.debug("Checking state of job "+j); 229 switch(j.checkState()) { 230 case SUCCESS: 231 successfulJobs.add(j); 232 it.remove(); 233 break; 234 case FAILED: 235 case DEPENDENT_FAILED: 236 failedJobs.add(j); 237 it.remove(); 238 break; 239 case READY: 240 j.submit(); 241 break; 242 case RUNNING: 243 case WAITING: 244 //Do Nothing 245 break; 246 } 247 } 248 } 249 250 if (this.runnerState != ThreadState.RUNNING && 251 this.runnerState != ThreadState.SUSPENDED) { 252 break; 253 } 254 try { 255 Thread.sleep(5000); 256 } 257 catch (Exception e) { 258 //TODO the thread was interrupted, do something!!! 259 } 260 if (this.runnerState != ThreadState.RUNNING && 261 this.runnerState != ThreadState.SUSPENDED) { 262 break; 263 } 264 } 265 }catch(Throwable t) { 266 LOG.error("Error while trying to run jobs.",t); 267 //Mark all jobs as failed because we got something bad. 268 failAllJobs(t); 269 } 270 this.runnerState = ThreadState.STOPPED; 271 } 272 failAllJobs(Throwable t)273 synchronized private void failAllJobs(Throwable t) { 274 String message = "Unexpected System Error Occured: "+ 275 StringUtils.stringifyException(t); 276 Iterator<ControlledJob> it = jobsInProgress.iterator(); 277 while(it.hasNext()) { 278 ControlledJob j = it.next(); 279 try { 280 j.failJob(message); 281 } catch (IOException e) { 282 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 283 } catch (InterruptedException e) { 284 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 285 } finally { 286 failedJobs.add(j); 287 it.remove(); 288 } 289 } 290 } 291 } 292