1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2.app.job.impl;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.EnumSet;
26 import java.util.HashMap;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedHashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.Set;
33 import java.util.concurrent.ScheduledFuture;
34 import java.util.concurrent.ScheduledThreadPoolExecutor;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.locks.Lock;
38 import java.util.concurrent.locks.ReadWriteLock;
39 import java.util.concurrent.locks.ReentrantReadWriteLock;
40 
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.classification.InterfaceAudience.Private;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.fs.FileContext;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.io.Text;
49 import org.apache.hadoop.mapred.JobACLsManager;
50 import org.apache.hadoop.mapred.JobConf;
51 import org.apache.hadoop.mapred.TaskCompletionEvent;
52 import org.apache.hadoop.mapreduce.Counters;
53 import org.apache.hadoop.mapreduce.JobACL;
54 import org.apache.hadoop.mapreduce.JobContext;
55 import org.apache.hadoop.mapreduce.MRJobConfig;
56 import org.apache.hadoop.mapreduce.OutputCommitter;
57 import org.apache.hadoop.mapreduce.TypeConverter;
58 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
59 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
60 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
61 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
62 import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
63 import org.apache.hadoop.mapreduce.jobhistory.JobQueueChangeEvent;
64 import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
65 import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
66 import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
67 import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
68 import org.apache.hadoop.mapreduce.security.TokenCache;
69 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
70 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
71 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
72 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
73 import org.apache.hadoop.mapreduce.task.JobContextImpl;
74 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
75 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
76 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
77 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
78 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
79 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
80 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
81 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
82 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
83 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
84 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
85 import org.apache.hadoop.mapreduce.v2.app.AppContext;
86 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
87 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobAbortEvent;
88 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobCommitEvent;
89 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobSetupEvent;
90 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
91 import org.apache.hadoop.mapreduce.v2.app.job.Task;
92 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
93 import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
94 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
95 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
96 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
97 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
98 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
99 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
100 import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
101 import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
102 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
103 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
104 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
105 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
106 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
107 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
108 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
109 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
110 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
111 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
112 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
113 import org.apache.hadoop.mapreduce.v2.util.MRApps;
114 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
115 import org.apache.hadoop.security.Credentials;
116 import org.apache.hadoop.security.UserGroupInformation;
117 import org.apache.hadoop.security.authorize.AccessControlList;
118 import org.apache.hadoop.security.token.Token;
119 import org.apache.hadoop.util.StringUtils;
120 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
121 import org.apache.hadoop.yarn.api.records.NodeId;
122 import org.apache.hadoop.yarn.api.records.NodeReport;
123 import org.apache.hadoop.yarn.api.records.NodeState;
124 import org.apache.hadoop.yarn.event.EventHandler;
125 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
126 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
127 import org.apache.hadoop.yarn.state.MultipleArcTransition;
128 import org.apache.hadoop.yarn.state.SingleArcTransition;
129 import org.apache.hadoop.yarn.state.StateMachine;
130 import org.apache.hadoop.yarn.state.StateMachineFactory;
131 import org.apache.hadoop.yarn.util.Clock;
132 
133 import com.google.common.util.concurrent.ThreadFactoryBuilder;
134 
135 /** Implementation of Job interface. Maintains the state machines of Job.
136  * The read and write calls use ReadWriteLock for concurrency.
137  */
138 @SuppressWarnings({ "rawtypes", "unchecked" })
139 public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
140   EventHandler<JobEvent> {
141 
142   private static final TaskAttemptCompletionEvent[]
143     EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TaskAttemptCompletionEvent[0];
144 
145   private static final TaskCompletionEvent[]
146     EMPTY_TASK_COMPLETION_EVENTS = new TaskCompletionEvent[0];
147 
148   private static final Log LOG = LogFactory.getLog(JobImpl.class);
149 
150   //The maximum fraction of fetch failures allowed for a map
151   private float maxAllowedFetchFailuresFraction;
152 
153   //Maximum no. of fetch-failure notifications after which map task is failed
154   private int maxFetchFailuresNotifications;
155 
156   public static final String JOB_KILLED_DIAG =
157       "Job received Kill while in RUNNING state.";
158 
159   //final fields
160   private final ApplicationAttemptId applicationAttemptId;
161   private final Clock clock;
162   private final JobACLsManager aclsManager;
163   private final String username;
164   private final Map<JobACL, AccessControlList> jobACLs;
165   private float setupWeight = 0.05f;
166   private float cleanupWeight = 0.05f;
167   private float mapWeight = 0.0f;
168   private float reduceWeight = 0.0f;
169   private final Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
170   private final List<AMInfo> amInfos;
171   private final Lock readLock;
172   private final Lock writeLock;
173   private final JobId jobId;
174   private final String jobName;
175   private final OutputCommitter committer;
176   private final boolean newApiCommitter;
177   private final org.apache.hadoop.mapreduce.JobID oldJobId;
178   private final TaskAttemptListener taskAttemptListener;
179   private final Object tasksSyncHandle = new Object();
180   private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>();
181   private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
182   /**
183    * maps nodes to tasks that have run on those nodes
184    */
185   private final HashMap<NodeId, List<TaskAttemptId>>
186     nodesToSucceededTaskAttempts = new HashMap<NodeId, List<TaskAttemptId>>();
187 
188   private final EventHandler eventHandler;
189   private final MRAppMetrics metrics;
190   private final String userName;
191   private String queueName;
192   private final long appSubmitTime;
193   private final AppContext appContext;
194 
195   private boolean lazyTasksCopyNeeded = false;
196   volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
197   private Counters jobCounters = new Counters();
198   private Object fullCountersLock = new Object();
199   private Counters fullCounters = null;
200   private Counters finalMapCounters = null;
201   private Counters finalReduceCounters = null;
202 
203     // FIXME:
204     //
205     // Can then replace task-level uber counters (MR-2424) with job-level ones
206     // sent from LocalContainerLauncher, and eventually including a count of
207     // of uber-AM attempts (probably sent from MRAppMaster).
208   public JobConf conf;
209 
210   //fields initialized in init
211   private FileSystem fs;
212   private Path remoteJobSubmitDir;
213   public Path remoteJobConfFile;
214   private JobContext jobContext;
215   private int allowedMapFailuresPercent = 0;
216   private int allowedReduceFailuresPercent = 0;
217   private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
218   private List<TaskCompletionEvent> mapAttemptCompletionEvents;
219   private List<Integer> taskCompletionIdxToMapCompletionIdx;
220   private final List<String> diagnostics = new ArrayList<String>();
221 
222   //task/attempt related datastructures
223   private final Map<TaskId, Integer> successAttemptCompletionEventNoMap =
224     new HashMap<TaskId, Integer>();
225   private final Map<TaskAttemptId, Integer> fetchFailuresMapping =
226     new HashMap<TaskAttemptId, Integer>();
227 
228   private static final DiagnosticsUpdateTransition
229       DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
230   private static final InternalErrorTransition
231       INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
232   private static final InternalRebootTransition
233       INTERNAL_REBOOT_TRANSITION = new InternalRebootTransition();
234   private static final TaskAttemptCompletedEventTransition
235       TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
236           new TaskAttemptCompletedEventTransition();
237   private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
238       new CounterUpdateTransition();
239   private static final UpdatedNodesTransition UPDATED_NODES_TRANSITION =
240       new UpdatedNodesTransition();
241 
242   protected static final
243     StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
244        stateMachineFactory
245      = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
246               (JobStateInternal.NEW)
247 
248           // Transitions from NEW state
249           .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
250               JobEventType.JOB_DIAGNOSTIC_UPDATE,
251               DIAGNOSTIC_UPDATE_TRANSITION)
252           .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
253               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
254           .addTransition
255               (JobStateInternal.NEW,
256               EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
257               JobEventType.JOB_INIT,
258               new InitTransition())
259           .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
260               JobEventType.JOB_INIT_FAILED,
261               new InitFailedTransition())
262           .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
263               JobEventType.JOB_KILL,
264               new KillNewJobTransition())
265           .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
266               JobEventType.INTERNAL_ERROR,
267               INTERNAL_ERROR_TRANSITION)
268           .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT,
269               JobEventType.JOB_AM_REBOOT,
270               INTERNAL_REBOOT_TRANSITION)
271           // Ignore-able events
272           .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
273               JobEventType.JOB_UPDATED_NODES)
274 
275           // Transitions from INITED state
276           .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
277               JobEventType.JOB_DIAGNOSTIC_UPDATE,
278               DIAGNOSTIC_UPDATE_TRANSITION)
279           .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
280               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
281           .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
282               JobEventType.JOB_START,
283               new StartTransition())
284           .addTransition(JobStateInternal.INITED, JobStateInternal.KILLED,
285               JobEventType.JOB_KILL,
286               new KillInitedJobTransition())
287           .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
288               JobEventType.INTERNAL_ERROR,
289               INTERNAL_ERROR_TRANSITION)
290           .addTransition(JobStateInternal.INITED, JobStateInternal.REBOOT,
291               JobEventType.JOB_AM_REBOOT,
292               INTERNAL_REBOOT_TRANSITION)
293           // Ignore-able events
294           .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
295               JobEventType.JOB_UPDATED_NODES)
296 
297           // Transitions from SETUP state
298           .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
299               JobEventType.JOB_DIAGNOSTIC_UPDATE,
300               DIAGNOSTIC_UPDATE_TRANSITION)
301           .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
302               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
303           .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
304               JobEventType.JOB_SETUP_COMPLETED,
305               new SetupCompletedTransition())
306           .addTransition(JobStateInternal.SETUP, JobStateInternal.FAIL_ABORT,
307               JobEventType.JOB_SETUP_FAILED,
308               new SetupFailedTransition())
309           .addTransition(JobStateInternal.SETUP, JobStateInternal.KILL_ABORT,
310               JobEventType.JOB_KILL,
311               new KilledDuringSetupTransition())
312           .addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR,
313               JobEventType.INTERNAL_ERROR,
314               INTERNAL_ERROR_TRANSITION)
315           .addTransition(JobStateInternal.SETUP, JobStateInternal.REBOOT,
316               JobEventType.JOB_AM_REBOOT,
317               INTERNAL_REBOOT_TRANSITION)
318           // Ignore-able events
319           .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
320               JobEventType.JOB_UPDATED_NODES)
321 
322           // Transitions from RUNNING state
323           .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
324               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
325               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
326           .addTransition
327               (JobStateInternal.RUNNING,
328               EnumSet.of(JobStateInternal.RUNNING,
329                   JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT,
330                   JobStateInternal.FAIL_ABORT),
331               JobEventType.JOB_TASK_COMPLETED,
332               new TaskCompletedTransition())
333           .addTransition
334               (JobStateInternal.RUNNING,
335               EnumSet.of(JobStateInternal.RUNNING,
336                   JobStateInternal.COMMITTING),
337               JobEventType.JOB_COMPLETED,
338               new JobNoTasksCompletedTransition())
339           .addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT,
340               JobEventType.JOB_KILL, new KillTasksTransition())
341           .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
342               JobEventType.JOB_UPDATED_NODES,
343               UPDATED_NODES_TRANSITION)
344           .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
345               JobEventType.JOB_MAP_TASK_RESCHEDULED,
346               new MapTaskRescheduledTransition())
347           .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
348               JobEventType.JOB_DIAGNOSTIC_UPDATE,
349               DIAGNOSTIC_UPDATE_TRANSITION)
350           .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
351               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
352           .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
353               JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
354               new TaskAttemptFetchFailureTransition())
355           .addTransition(
356               JobStateInternal.RUNNING,
357               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
358               INTERNAL_ERROR_TRANSITION)
359           .addTransition(JobStateInternal.RUNNING, JobStateInternal.REBOOT,
360               JobEventType.JOB_AM_REBOOT,
361               INTERNAL_REBOOT_TRANSITION)
362 
363           // Transitions from KILL_WAIT state.
364           .addTransition
365               (JobStateInternal.KILL_WAIT,
366               EnumSet.of(JobStateInternal.KILL_WAIT,
367                   JobStateInternal.KILL_ABORT),
368               JobEventType.JOB_TASK_COMPLETED,
369               new KillWaitTaskCompletedTransition())
370           .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
371               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
372               TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
373           .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
374               JobEventType.JOB_DIAGNOSTIC_UPDATE,
375               DIAGNOSTIC_UPDATE_TRANSITION)
376           .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
377               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
378           .addTransition(
379               JobStateInternal.KILL_WAIT,
380               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
381               INTERNAL_ERROR_TRANSITION)
382           // Ignore-able events
383           .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
384               EnumSet.of(JobEventType.JOB_KILL,
385                   JobEventType.JOB_UPDATED_NODES,
386                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
387                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
388                   JobEventType.JOB_AM_REBOOT))
389 
390           // Transitions from COMMITTING state
391           .addTransition(JobStateInternal.COMMITTING,
392               JobStateInternal.SUCCEEDED,
393               JobEventType.JOB_COMMIT_COMPLETED,
394               new CommitSucceededTransition())
395           .addTransition(JobStateInternal.COMMITTING,
396               JobStateInternal.FAIL_ABORT,
397               JobEventType.JOB_COMMIT_FAILED,
398               new CommitFailedTransition())
399           .addTransition(JobStateInternal.COMMITTING,
400               JobStateInternal.KILL_ABORT,
401               JobEventType.JOB_KILL,
402               new KilledDuringCommitTransition())
403           .addTransition(JobStateInternal.COMMITTING,
404               JobStateInternal.COMMITTING,
405               JobEventType.JOB_DIAGNOSTIC_UPDATE,
406               DIAGNOSTIC_UPDATE_TRANSITION)
407           .addTransition(JobStateInternal.COMMITTING,
408               JobStateInternal.COMMITTING,
409               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
410           .addTransition(JobStateInternal.COMMITTING,
411               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
412               INTERNAL_ERROR_TRANSITION)
413           .addTransition(JobStateInternal.COMMITTING, JobStateInternal.REBOOT,
414               JobEventType.JOB_AM_REBOOT,
415               INTERNAL_REBOOT_TRANSITION)
416               // Ignore-able events
417           .addTransition(JobStateInternal.COMMITTING,
418               JobStateInternal.COMMITTING,
419               EnumSet.of(JobEventType.JOB_UPDATED_NODES,
420                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
421 
422           // Transitions from SUCCEEDED state
423           .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
424               JobEventType.JOB_DIAGNOSTIC_UPDATE,
425               DIAGNOSTIC_UPDATE_TRANSITION)
426           .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
427               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
428           .addTransition(
429               JobStateInternal.SUCCEEDED,
430               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
431               INTERNAL_ERROR_TRANSITION)
432           // Ignore-able events
433           .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
434               EnumSet.of(JobEventType.JOB_KILL,
435                   JobEventType.JOB_UPDATED_NODES,
436                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
437                   JobEventType.JOB_AM_REBOOT,
438                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
439                   JobEventType.JOB_MAP_TASK_RESCHEDULED))
440 
441           // Transitions from FAIL_WAIT state
442           .addTransition(JobStateInternal.FAIL_WAIT,
443               JobStateInternal.FAIL_WAIT,
444               JobEventType.JOB_DIAGNOSTIC_UPDATE,
445               DIAGNOSTIC_UPDATE_TRANSITION)
446           .addTransition(JobStateInternal.FAIL_WAIT,
447               JobStateInternal.FAIL_WAIT,
448               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
449           .addTransition(JobStateInternal.FAIL_WAIT,
450               EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT),
451               JobEventType.JOB_TASK_COMPLETED,
452               new JobFailWaitTransition())
453           .addTransition(JobStateInternal.FAIL_WAIT,
454               JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT,
455               new JobFailWaitTimedOutTransition())
456           .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED,
457               JobEventType.JOB_KILL,
458               new KilledDuringAbortTransition())
459           .addTransition(JobStateInternal.FAIL_WAIT,
460               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
461               INTERNAL_ERROR_TRANSITION)
462           // Ignore-able events
463           .addTransition(JobStateInternal.FAIL_WAIT,
464               JobStateInternal.FAIL_WAIT,
465               EnumSet.of(JobEventType.JOB_UPDATED_NODES,
466                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
467                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
468                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
469                   JobEventType.JOB_AM_REBOOT))
470 
471           //Transitions from FAIL_ABORT state
472           .addTransition(JobStateInternal.FAIL_ABORT,
473               JobStateInternal.FAIL_ABORT,
474               JobEventType.JOB_DIAGNOSTIC_UPDATE,
475               DIAGNOSTIC_UPDATE_TRANSITION)
476           .addTransition(JobStateInternal.FAIL_ABORT,
477               JobStateInternal.FAIL_ABORT,
478               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
479           .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED,
480               JobEventType.JOB_ABORT_COMPLETED,
481               new JobAbortCompletedTransition())
482           .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.KILLED,
483               JobEventType.JOB_KILL,
484               new KilledDuringAbortTransition())
485           .addTransition(JobStateInternal.FAIL_ABORT,
486               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
487               INTERNAL_ERROR_TRANSITION)
488           // Ignore-able events
489           .addTransition(JobStateInternal.FAIL_ABORT,
490               JobStateInternal.FAIL_ABORT,
491               EnumSet.of(JobEventType.JOB_UPDATED_NODES,
492                   JobEventType.JOB_TASK_COMPLETED,
493                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
494                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
495                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
496                   JobEventType.JOB_COMMIT_COMPLETED,
497                   JobEventType.JOB_COMMIT_FAILED,
498                   JobEventType.JOB_AM_REBOOT,
499                   JobEventType.JOB_FAIL_WAIT_TIMEDOUT))
500 
501           // Transitions from KILL_ABORT state
502           .addTransition(JobStateInternal.KILL_ABORT,
503               JobStateInternal.KILL_ABORT,
504               JobEventType.JOB_DIAGNOSTIC_UPDATE,
505               DIAGNOSTIC_UPDATE_TRANSITION)
506           .addTransition(JobStateInternal.KILL_ABORT,
507               JobStateInternal.KILL_ABORT,
508               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
509           .addTransition(JobStateInternal.KILL_ABORT, JobStateInternal.KILLED,
510               JobEventType.JOB_ABORT_COMPLETED,
511               new JobAbortCompletedTransition())
512           .addTransition(JobStateInternal.KILL_ABORT, JobStateInternal.KILLED,
513               JobEventType.JOB_KILL,
514               new KilledDuringAbortTransition())
515           .addTransition(JobStateInternal.KILL_ABORT,
516               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
517               INTERNAL_ERROR_TRANSITION)
518           // Ignore-able events
519           .addTransition(JobStateInternal.KILL_ABORT,
520               JobStateInternal.KILL_ABORT,
521               EnumSet.of(JobEventType.JOB_UPDATED_NODES,
522                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
523                   JobEventType.JOB_SETUP_COMPLETED,
524                   JobEventType.JOB_SETUP_FAILED,
525                   JobEventType.JOB_COMMIT_COMPLETED,
526                   JobEventType.JOB_COMMIT_FAILED,
527                   JobEventType.JOB_AM_REBOOT))
528 
529           // Transitions from FAILED state
530           .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
531               JobEventType.JOB_DIAGNOSTIC_UPDATE,
532               DIAGNOSTIC_UPDATE_TRANSITION)
533           .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
534               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
535           .addTransition(
536               JobStateInternal.FAILED,
537               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
538               INTERNAL_ERROR_TRANSITION)
539           // Ignore-able events
540           .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
541               EnumSet.of(JobEventType.JOB_KILL,
542                   JobEventType.JOB_UPDATED_NODES,
543                   JobEventType.JOB_TASK_COMPLETED,
544                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
545                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
546                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
547                   JobEventType.JOB_SETUP_COMPLETED,
548                   JobEventType.JOB_SETUP_FAILED,
549                   JobEventType.JOB_COMMIT_COMPLETED,
550                   JobEventType.JOB_COMMIT_FAILED,
551                   JobEventType.JOB_ABORT_COMPLETED,
552                   JobEventType.JOB_AM_REBOOT))
553 
554           // Transitions from KILLED state
555           .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
556               JobEventType.JOB_DIAGNOSTIC_UPDATE,
557               DIAGNOSTIC_UPDATE_TRANSITION)
558           .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
559               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
560           .addTransition(
561               JobStateInternal.KILLED,
562               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
563               INTERNAL_ERROR_TRANSITION)
564           // Ignore-able events
565           .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
566               EnumSet.of(JobEventType.JOB_KILL,
567                   JobEventType.JOB_START,
568                   JobEventType.JOB_UPDATED_NODES,
569                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
570                   JobEventType.JOB_SETUP_COMPLETED,
571                   JobEventType.JOB_SETUP_FAILED,
572                   JobEventType.JOB_COMMIT_COMPLETED,
573                   JobEventType.JOB_COMMIT_FAILED,
574                   JobEventType.JOB_ABORT_COMPLETED,
575                   JobEventType.JOB_AM_REBOOT))
576 
577           // No transitions from INTERNAL_ERROR state. Ignore all.
578           .addTransition(
579               JobStateInternal.ERROR,
580               JobStateInternal.ERROR,
581               EnumSet.of(JobEventType.JOB_INIT,
582                   JobEventType.JOB_KILL,
583                   JobEventType.JOB_TASK_COMPLETED,
584                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
585                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
586                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
587                   JobEventType.JOB_UPDATED_NODES,
588                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
589                   JobEventType.JOB_SETUP_COMPLETED,
590                   JobEventType.JOB_SETUP_FAILED,
591                   JobEventType.JOB_COMMIT_COMPLETED,
592                   JobEventType.JOB_COMMIT_FAILED,
593                   JobEventType.JOB_ABORT_COMPLETED,
594                   JobEventType.INTERNAL_ERROR,
595                   JobEventType.JOB_AM_REBOOT))
596           .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
597               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
598 
599           // No transitions from AM_REBOOT state. Ignore all.
600           .addTransition(
601               JobStateInternal.REBOOT,
602               JobStateInternal.REBOOT,
603               EnumSet.of(JobEventType.JOB_INIT,
604                   JobEventType.JOB_KILL,
605                   JobEventType.JOB_TASK_COMPLETED,
606                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
607                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
608                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
609                   JobEventType.JOB_UPDATED_NODES,
610                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
611                   JobEventType.JOB_SETUP_COMPLETED,
612                   JobEventType.JOB_SETUP_FAILED,
613                   JobEventType.JOB_COMMIT_COMPLETED,
614                   JobEventType.JOB_COMMIT_FAILED,
615                   JobEventType.JOB_ABORT_COMPLETED,
616                   JobEventType.INTERNAL_ERROR,
617                   JobEventType.JOB_AM_REBOOT))
618           .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT,
619               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
620 
621           // create the topology tables
622           .installTopology();
623 
624   private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
625 
626   //changing fields while the job is running
627   private int numMapTasks;
628   private int numReduceTasks;
629   private int completedTaskCount = 0;
630   private int succeededMapTaskCount = 0;
631   private int succeededReduceTaskCount = 0;
632   private int failedMapTaskCount = 0;
633   private int failedReduceTaskCount = 0;
634   private int killedMapTaskCount = 0;
635   private int killedReduceTaskCount = 0;
636   private long startTime;
637   private long finishTime;
638   private float setupProgress;
639   private float mapProgress;
640   private float reduceProgress;
641   private float cleanupProgress;
642   private boolean isUber = false;
643 
644   private Credentials jobCredentials;
645   private Token<JobTokenIdentifier> jobToken;
646   private JobTokenSecretManager jobTokenSecretManager;
647 
648   private JobStateInternal forcedState = null;
649 
650   //Executor used for running future tasks.
651   private ScheduledThreadPoolExecutor executor;
652   private ScheduledFuture failWaitTriggerScheduledFuture;
653 
654   private JobState lastNonFinalState = JobState.NEW;
655 
JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, JobTokenSecretManager jobTokenSecretManager, Credentials jobCredentials, Clock clock, Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics, OutputCommitter committer, boolean newApiCommitter, String userName, long appSubmitTime, List<AMInfo> amInfos, AppContext appContext, JobStateInternal forcedState, String forcedDiagnostic)656   public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
657       Configuration conf, EventHandler eventHandler,
658       TaskAttemptListener taskAttemptListener,
659       JobTokenSecretManager jobTokenSecretManager,
660       Credentials jobCredentials, Clock clock,
661       Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
662       OutputCommitter committer, boolean newApiCommitter, String userName,
663       long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
664       JobStateInternal forcedState, String forcedDiagnostic) {
665     this.applicationAttemptId = applicationAttemptId;
666     this.jobId = jobId;
667     this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
668     this.conf = new JobConf(conf);
669     this.metrics = metrics;
670     this.clock = clock;
671     this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
672     this.amInfos = amInfos;
673     this.appContext = appContext;
674     this.userName = userName;
675     this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
676     this.appSubmitTime = appSubmitTime;
677     this.oldJobId = TypeConverter.fromYarn(jobId);
678     this.committer = committer;
679     this.newApiCommitter = newApiCommitter;
680 
681     this.taskAttemptListener = taskAttemptListener;
682     this.eventHandler = eventHandler;
683     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
684     this.readLock = readWriteLock.readLock();
685     this.writeLock = readWriteLock.writeLock();
686 
687     this.jobCredentials = jobCredentials;
688     this.jobTokenSecretManager = jobTokenSecretManager;
689 
690     this.aclsManager = new JobACLsManager(conf);
691     this.username = System.getProperty("user.name");
692     this.jobACLs = aclsManager.constructJobACLs(conf);
693 
694     ThreadFactory threadFactory = new ThreadFactoryBuilder()
695       .setNameFormat("Job Fail Wait Timeout Monitor #%d")
696       .setDaemon(true)
697       .build();
698     this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
699 
700     // This "this leak" is okay because the retained pointer is in an
701     //  instance variable.
702     stateMachine = stateMachineFactory.make(this);
703     this.forcedState  = forcedState;
704     if(forcedDiagnostic != null) {
705       this.diagnostics.add(forcedDiagnostic);
706     }
707 
708     this.maxAllowedFetchFailuresFraction = conf.getFloat(
709         MRJobConfig.MAX_ALLOWED_FETCH_FAILURES_FRACTION,
710         MRJobConfig.DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION);
711     this.maxFetchFailuresNotifications = conf.getInt(
712         MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
713         MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
714   }
715 
getStateMachine()716   protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
717     return stateMachine;
718   }
719 
720   @Override
getID()721   public JobId getID() {
722     return jobId;
723   }
724 
getEventHandler()725   EventHandler getEventHandler() {
726     return this.eventHandler;
727   }
728 
getJobContext()729   JobContext getJobContext() {
730     return this.jobContext;
731   }
732 
733   @Override
checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)734   public boolean checkAccess(UserGroupInformation callerUGI,
735       JobACL jobOperation) {
736     AccessControlList jobACL = jobACLs.get(jobOperation);
737     if (jobACL == null) {
738       return true;
739     }
740     return aclsManager.checkAccess(callerUGI, jobOperation, userName, jobACL);
741   }
742 
743   @Override
getTask(TaskId taskID)744   public Task getTask(TaskId taskID) {
745     readLock.lock();
746     try {
747       return tasks.get(taskID);
748     } finally {
749       readLock.unlock();
750     }
751   }
752 
753   @Override
getCompletedMaps()754   public int getCompletedMaps() {
755     readLock.lock();
756     try {
757       return succeededMapTaskCount + failedMapTaskCount + killedMapTaskCount;
758     } finally {
759       readLock.unlock();
760     }
761   }
762 
763   @Override
getCompletedReduces()764   public int getCompletedReduces() {
765     readLock.lock();
766     try {
767       return succeededReduceTaskCount + failedReduceTaskCount
768                   + killedReduceTaskCount;
769     } finally {
770       readLock.unlock();
771     }
772   }
773 
774   @Override
isUber()775   public boolean isUber() {
776     return isUber;
777   }
778 
779   @Override
getAllCounters()780   public Counters getAllCounters() {
781 
782     readLock.lock();
783 
784     try {
785       JobStateInternal state = getInternalState();
786       if (state == JobStateInternal.ERROR || state == JobStateInternal.FAILED
787           || state == JobStateInternal.KILLED || state == JobStateInternal.SUCCEEDED) {
788         this.mayBeConstructFinalFullCounters();
789         return fullCounters;
790       }
791 
792       Counters counters = new Counters();
793       counters.incrAllCounters(jobCounters);
794       return incrTaskCounters(counters, tasks.values());
795 
796     } finally {
797       readLock.unlock();
798     }
799   }
800 
incrTaskCounters( Counters counters, Collection<Task> tasks)801   public static Counters incrTaskCounters(
802       Counters counters, Collection<Task> tasks) {
803     for (Task task : tasks) {
804       counters.incrAllCounters(task.getCounters());
805     }
806     return counters;
807   }
808 
809   @Override
getTaskAttemptCompletionEvents( int fromEventId, int maxEvents)810   public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
811       int fromEventId, int maxEvents) {
812     TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
813     readLock.lock();
814     try {
815       if (taskAttemptCompletionEvents.size() > fromEventId) {
816         int actualMax = Math.min(maxEvents,
817             (taskAttemptCompletionEvents.size() - fromEventId));
818         events = taskAttemptCompletionEvents.subList(fromEventId,
819             actualMax + fromEventId).toArray(events);
820       }
821       return events;
822     } finally {
823       readLock.unlock();
824     }
825   }
826 
827   @Override
getMapAttemptCompletionEvents( int startIndex, int maxEvents)828   public TaskCompletionEvent[] getMapAttemptCompletionEvents(
829       int startIndex, int maxEvents) {
830     TaskCompletionEvent[] events = EMPTY_TASK_COMPLETION_EVENTS;
831     readLock.lock();
832     try {
833       if (mapAttemptCompletionEvents.size() > startIndex) {
834         int actualMax = Math.min(maxEvents,
835             (mapAttemptCompletionEvents.size() - startIndex));
836         events = mapAttemptCompletionEvents.subList(startIndex,
837             actualMax + startIndex).toArray(events);
838       }
839       return events;
840     } finally {
841       readLock.unlock();
842     }
843   }
844 
845   @Override
getDiagnostics()846   public List<String> getDiagnostics() {
847     readLock.lock();
848     try {
849       return diagnostics;
850     } finally {
851       readLock.unlock();
852     }
853   }
854 
855   @Override
getReport()856   public JobReport getReport() {
857     readLock.lock();
858     try {
859       JobState state = getState();
860 
861       // jobFile can be null if the job is not yet inited.
862       String jobFile =
863           remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
864 
865       StringBuilder diagsb = new StringBuilder();
866       for (String s : getDiagnostics()) {
867         diagsb.append(s).append("\n");
868       }
869 
870       if (getInternalState() == JobStateInternal.NEW) {
871         return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
872             appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
873             cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
874       }
875 
876       computeProgress();
877       JobReport report = MRBuilderUtils.newJobReport(jobId, jobName, username,
878           state, appSubmitTime, startTime, finishTime, setupProgress,
879           this.mapProgress, this.reduceProgress,
880           cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
881       return report;
882     } finally {
883       readLock.unlock();
884     }
885   }
886 
887   @Override
getProgress()888   public float getProgress() {
889     this.readLock.lock();
890     try {
891       computeProgress();
892       return (this.setupProgress * this.setupWeight + this.cleanupProgress
893           * this.cleanupWeight + this.mapProgress * this.mapWeight + this.reduceProgress
894           * this.reduceWeight);
895     } finally {
896       this.readLock.unlock();
897     }
898   }
899 
computeProgress()900   private void computeProgress() {
901     this.readLock.lock();
902     try {
903       float mapProgress = 0f;
904       float reduceProgress = 0f;
905       for (Task task : this.tasks.values()) {
906         if (task.getType() == TaskType.MAP) {
907           mapProgress += (task.isFinished() ? 1f : task.getProgress());
908         } else {
909           reduceProgress += (task.isFinished() ? 1f : task.getProgress());
910         }
911       }
912       if (this.numMapTasks != 0) {
913         mapProgress = mapProgress / this.numMapTasks;
914       }
915       if (this.numReduceTasks != 0) {
916         reduceProgress = reduceProgress / this.numReduceTasks;
917       }
918       this.mapProgress = mapProgress;
919       this.reduceProgress = reduceProgress;
920     } finally {
921       this.readLock.unlock();
922     }
923   }
924 
925   @Override
getTasks()926   public Map<TaskId, Task> getTasks() {
927     synchronized (tasksSyncHandle) {
928       lazyTasksCopyNeeded = true;
929       return Collections.unmodifiableMap(tasks);
930     }
931   }
932 
933   @Override
getTasks(TaskType taskType)934   public Map<TaskId,Task> getTasks(TaskType taskType) {
935     Map<TaskId, Task> localTasksCopy = tasks;
936     Map<TaskId, Task> result = new HashMap<TaskId, Task>();
937     Set<TaskId> tasksOfGivenType = null;
938     readLock.lock();
939     try {
940       if (TaskType.MAP == taskType) {
941         tasksOfGivenType = mapTasks;
942       } else {
943         tasksOfGivenType = reduceTasks;
944       }
945       for (TaskId taskID : tasksOfGivenType)
946       result.put(taskID, localTasksCopy.get(taskID));
947       return result;
948     } finally {
949       readLock.unlock();
950     }
951   }
952 
953   @Override
getState()954   public JobState getState() {
955     readLock.lock();
956     try {
957       JobState state = getExternalState(getInternalState());
958       if (!appContext.hasSuccessfullyUnregistered()
959           && (state == JobState.SUCCEEDED || state == JobState.FAILED
960           || state == JobState.KILLED || state == JobState.ERROR)) {
961         return lastNonFinalState;
962       } else {
963         return state;
964       }
965     } finally {
966       readLock.unlock();
967     }
968   }
969 
scheduleTasks(Set<TaskId> taskIDs, boolean recoverTaskOutput)970   protected void scheduleTasks(Set<TaskId> taskIDs,
971       boolean recoverTaskOutput) {
972     for (TaskId taskID : taskIDs) {
973       TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
974       if (taskInfo != null) {
975         eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
976             committer, recoverTaskOutput));
977       } else {
978         eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
979       }
980     }
981   }
982 
983   @Override
984   /**
985    * The only entry point to change the Job.
986    */
handle(JobEvent event)987   public void handle(JobEvent event) {
988     if (LOG.isDebugEnabled()) {
989       LOG.debug("Processing " + event.getJobId() + " of type "
990           + event.getType());
991     }
992     try {
993       writeLock.lock();
994       JobStateInternal oldState = getInternalState();
995       try {
996          getStateMachine().doTransition(event.getType(), event);
997       } catch (InvalidStateTransitonException e) {
998         LOG.error("Can't handle this event at current state", e);
999         addDiagnostic("Invalid event " + event.getType() +
1000             " on Job " + this.jobId);
1001         eventHandler.handle(new JobEvent(this.jobId,
1002             JobEventType.INTERNAL_ERROR));
1003       }
1004       //notify the eventhandler of state change
1005       if (oldState != getInternalState()) {
1006         LOG.info(jobId + "Job Transitioned from " + oldState + " to "
1007                  + getInternalState());
1008         rememberLastNonFinalState(oldState);
1009       }
1010     }
1011 
1012     finally {
1013       writeLock.unlock();
1014     }
1015   }
1016 
rememberLastNonFinalState(JobStateInternal stateInternal)1017   private void rememberLastNonFinalState(JobStateInternal stateInternal) {
1018     JobState state = getExternalState(stateInternal);
1019     // if state is not the final state, set lastNonFinalState
1020     if (state != JobState.SUCCEEDED && state != JobState.FAILED
1021         && state != JobState.KILLED && state != JobState.ERROR) {
1022       lastNonFinalState = state;
1023     }
1024   }
1025 
1026   @Private
getInternalState()1027   public JobStateInternal getInternalState() {
1028     readLock.lock();
1029     try {
1030       if(forcedState != null) {
1031         return forcedState;
1032       }
1033      return getStateMachine().getCurrentState();
1034     } finally {
1035       readLock.unlock();
1036     }
1037   }
1038 
getExternalState(JobStateInternal smState)1039   private JobState getExternalState(JobStateInternal smState) {
1040     switch (smState) {
1041     case KILL_WAIT:
1042     case KILL_ABORT:
1043       return JobState.KILLED;
1044     case SETUP:
1045     case COMMITTING:
1046       return JobState.RUNNING;
1047     case FAIL_WAIT:
1048     case FAIL_ABORT:
1049       return JobState.FAILED;
1050     case REBOOT:
1051       if (appContext.isLastAMRetry()) {
1052         return JobState.ERROR;
1053       } else {
1054         // In case of not last retry, return the external state as RUNNING since
1055         // otherwise JobClient will exit when it polls the AM for job state
1056         return JobState.RUNNING;
1057       }
1058     default:
1059       return JobState.valueOf(smState.name());
1060     }
1061   }
1062 
1063 
1064   //helpful in testing
addTask(Task task)1065   protected void addTask(Task task) {
1066     synchronized (tasksSyncHandle) {
1067       if (lazyTasksCopyNeeded) {
1068         Map<TaskId, Task> newTasks = new LinkedHashMap<TaskId, Task>();
1069         newTasks.putAll(tasks);
1070         tasks = newTasks;
1071         lazyTasksCopyNeeded = false;
1072       }
1073     }
1074     tasks.put(task.getID(), task);
1075     if (task.getType() == TaskType.MAP) {
1076       mapTasks.add(task.getID());
1077     } else if (task.getType() == TaskType.REDUCE) {
1078       reduceTasks.add(task.getID());
1079     }
1080     metrics.waitingTask(task);
1081   }
1082 
setFinishTime()1083   void setFinishTime() {
1084     finishTime = clock.getTime();
1085   }
1086 
logJobHistoryFinishedEvent()1087   void logJobHistoryFinishedEvent() {
1088     this.setFinishTime();
1089     JobFinishedEvent jfe = createJobFinishedEvent(this);
1090     LOG.info("Calling handler for JobFinishedEvent ");
1091     this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));
1092   }
1093 
1094   /**
1095    * Create the default file System for this job.
1096    * @param conf the conf object
1097    * @return the default filesystem for this job
1098    * @throws IOException
1099    */
getFileSystem(Configuration conf)1100   protected FileSystem getFileSystem(Configuration conf) throws IOException {
1101     return FileSystem.get(conf);
1102   }
1103 
checkReadyForCommit()1104   protected JobStateInternal checkReadyForCommit() {
1105     JobStateInternal currentState = getInternalState();
1106     if (completedTaskCount == tasks.size()
1107         && currentState == JobStateInternal.RUNNING) {
1108       eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext()));
1109       return JobStateInternal.COMMITTING;
1110     }
1111     // return the current state as job not ready to commit yet
1112     return getInternalState();
1113   }
1114 
finished(JobStateInternal finalState)1115   JobStateInternal finished(JobStateInternal finalState) {
1116     if (getInternalState() == JobStateInternal.RUNNING) {
1117       metrics.endRunningJob(this);
1118     }
1119     if (finishTime == 0) setFinishTime();
1120     eventHandler.handle(new JobFinishEvent(jobId));
1121 
1122     switch (finalState) {
1123       case KILLED:
1124         metrics.killedJob(this);
1125         break;
1126       case REBOOT:
1127       case ERROR:
1128       case FAILED:
1129         metrics.failedJob(this);
1130         break;
1131       case SUCCEEDED:
1132         metrics.completedJob(this);
1133         break;
1134       default:
1135         throw new IllegalArgumentException("Illegal job state: " + finalState);
1136     }
1137     return finalState;
1138   }
1139 
1140   @Override
getUserName()1141   public String getUserName() {
1142     return userName;
1143   }
1144 
1145   @Override
getQueueName()1146   public String getQueueName() {
1147     return queueName;
1148   }
1149 
1150   @Override
setQueueName(String queueName)1151   public void setQueueName(String queueName) {
1152     this.queueName = queueName;
1153     JobQueueChangeEvent jqce = new JobQueueChangeEvent(oldJobId, queueName);
1154     eventHandler.handle(new JobHistoryEvent(jobId, jqce));
1155   }
1156 
1157   /*
1158    * (non-Javadoc)
1159    * @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile()
1160    */
1161   @Override
getConfFile()1162   public Path getConfFile() {
1163     return remoteJobConfFile;
1164   }
1165 
1166   @Override
getName()1167   public String getName() {
1168     return jobName;
1169   }
1170 
1171   @Override
getTotalMaps()1172   public int getTotalMaps() {
1173     return mapTasks.size();  //FIXME: why indirection? return numMapTasks...
1174                              // unless race?  how soon can this get called?
1175   }
1176 
1177   @Override
getTotalReduces()1178   public int getTotalReduces() {
1179     return reduceTasks.size();  //FIXME: why indirection? return numReduceTasks
1180   }
1181 
1182   /*
1183    * (non-Javadoc)
1184    * @see org.apache.hadoop.mapreduce.v2.app.job.Job#getJobACLs()
1185    */
1186   @Override
getJobACLs()1187   public Map<JobACL, AccessControlList> getJobACLs() {
1188     return Collections.unmodifiableMap(jobACLs);
1189   }
1190 
1191   @Override
getAMInfos()1192   public List<AMInfo> getAMInfos() {
1193     return amInfos;
1194   }
1195 
1196   /**
1197    * Decide whether job can be run in uber mode based on various criteria.
1198    * @param dataInputLength Total length for all splits
1199    */
makeUberDecision(long dataInputLength)1200   private void makeUberDecision(long dataInputLength) {
1201     //FIXME:  need new memory criterion for uber-decision (oops, too late here;
1202     // until AM-resizing supported,
1203     // must depend on job client to pass fat-slot needs)
1204     // these are no longer "system" settings, necessarily; user may override
1205     int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
1206 
1207     int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
1208 
1209     long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
1210         fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from
1211                                    // [File?]InputFormat and default block size
1212                                    // from that
1213 
1214     long sysMemSizeForUberSlot =
1215         conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
1216             MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
1217 
1218     long sysCPUSizeForUberSlot =
1219         conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
1220             MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
1221 
1222     boolean uberEnabled =
1223         conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
1224     boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
1225     boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
1226     boolean smallInput = (dataInputLength <= sysMaxBytes);
1227     // ignoring overhead due to UberAM and statics as negligible here:
1228     long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);
1229     long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);
1230     long requiredMB = Math.max(requiredMapMB, requiredReduceMB);
1231     int requiredMapCores = conf.getInt(
1232             MRJobConfig.MAP_CPU_VCORES,
1233             MRJobConfig.DEFAULT_MAP_CPU_VCORES);
1234     int requiredReduceCores = conf.getInt(
1235             MRJobConfig.REDUCE_CPU_VCORES,
1236             MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
1237     int requiredCores = Math.max(requiredMapCores, requiredReduceCores);
1238     if (numReduceTasks == 0) {
1239       requiredMB = requiredMapMB;
1240       requiredCores = requiredMapCores;
1241     }
1242     boolean smallMemory =
1243         (requiredMB <= sysMemSizeForUberSlot)
1244         || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
1245 
1246     boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
1247     boolean notChainJob = !isChainJob(conf);
1248 
1249     // User has overall veto power over uberization, or user can modify
1250     // limits (overriding system settings and potentially shooting
1251     // themselves in the head).  Note that ChainMapper/Reducer are
1252     // fundamentally incompatible with MR-1220; they employ a blocking
1253     // queue between the maps/reduces and thus require parallel execution,
1254     // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
1255     // and thus requires sequential execution.
1256     isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
1257         && smallInput && smallMemory && smallCpu
1258         && notChainJob;
1259 
1260     if (isUber) {
1261       LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
1262           + numReduceTasks + "r tasks (" + dataInputLength
1263           + " input bytes) will run sequentially on single node.");
1264 
1265       // make sure reduces are scheduled only after all map are completed
1266       conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
1267                         1.0f);
1268       // uber-subtask attempts all get launched on same node; if one fails,
1269       // probably should retry elsewhere, i.e., move entire uber-AM:  ergo,
1270       // limit attempts to 1 (or at most 2?  probably not...)
1271       conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
1272       conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
1273 
1274       // disable speculation
1275       conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
1276       conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
1277     } else {
1278       StringBuilder msg = new StringBuilder();
1279       msg.append("Not uberizing ").append(jobId).append(" because:");
1280       if (!uberEnabled)
1281         msg.append(" not enabled;");
1282       if (!smallNumMapTasks)
1283         msg.append(" too many maps;");
1284       if (!smallNumReduceTasks)
1285         msg.append(" too many reduces;");
1286       if (!smallInput)
1287         msg.append(" too much input;");
1288       if (!smallCpu)
1289         msg.append(" too much CPU;");
1290       if (!smallMemory)
1291         msg.append(" too much RAM;");
1292       if (!notChainJob)
1293         msg.append(" chainjob;");
1294       LOG.info(msg.toString());
1295     }
1296   }
1297 
1298   /**
1299    * ChainMapper and ChainReducer must execute in parallel, so they're not
1300    * compatible with uberization/LocalContainerLauncher (100% sequential).
1301    */
isChainJob(Configuration conf)1302   private boolean isChainJob(Configuration conf) {
1303     boolean isChainJob = false;
1304     try {
1305       String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
1306       if (mapClassName != null) {
1307         Class<?> mapClass = Class.forName(mapClassName);
1308         if (ChainMapper.class.isAssignableFrom(mapClass))
1309           isChainJob = true;
1310       }
1311     } catch (ClassNotFoundException cnfe) {
1312       // don't care; assume it's not derived from ChainMapper
1313     } catch (NoClassDefFoundError ignored) {
1314     }
1315     try {
1316       String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
1317       if (reduceClassName != null) {
1318         Class<?> reduceClass = Class.forName(reduceClassName);
1319         if (ChainReducer.class.isAssignableFrom(reduceClass))
1320           isChainJob = true;
1321       }
1322     } catch (ClassNotFoundException cnfe) {
1323       // don't care; assume it's not derived from ChainReducer
1324     } catch (NoClassDefFoundError ignored) {
1325     }
1326     return isChainJob;
1327   }
1328 
actOnUnusableNode(NodeId nodeId, NodeState nodeState)1329   private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) {
1330     // rerun previously successful map tasks
1331     List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId);
1332     if(taskAttemptIdList != null) {
1333       String mesg = "TaskAttempt killed because it ran on unusable node "
1334           + nodeId;
1335       for(TaskAttemptId id : taskAttemptIdList) {
1336         if(TaskType.MAP == id.getTaskId().getTaskType()) {
1337           // reschedule only map tasks because their outputs maybe unusable
1338           LOG.info(mesg + ". AttemptId:" + id);
1339           eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
1340         }
1341       }
1342     }
1343     // currently running task attempts on unusable nodes are handled in
1344     // RMContainerAllocator
1345   }
1346 
1347   /*
1348   private int getBlockSize() {
1349     String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
1350     if (inputClassName != null) {
1351       Class<?> inputClass - Class.forName(inputClassName);
1352       if (FileInputFormat<K, V>)
1353     }
1354   }
1355   */
1356   /**
1357     * Get the workflow adjacencies from the job conf
1358     * The string returned is of the form "key"="value" "key"="value" ...
1359     */
getWorkflowAdjacencies(Configuration conf)1360   private static String getWorkflowAdjacencies(Configuration conf) {
1361     int prefixLen = MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING.length();
1362     Map<String,String> adjacencies =
1363         conf.getValByRegex(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN);
1364     if (adjacencies.isEmpty()) {
1365       return "";
1366     }
1367     int size = 0;
1368     for (Entry<String,String> entry : adjacencies.entrySet()) {
1369       int keyLen = entry.getKey().length();
1370       size += keyLen - prefixLen;
1371       size += entry.getValue().length() + 6;
1372     }
1373     StringBuilder sb = new StringBuilder(size);
1374     for (Entry<String,String> entry : adjacencies.entrySet()) {
1375       int keyLen = entry.getKey().length();
1376       sb.append("\"");
1377       sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen)));
1378       sb.append("\"=\"");
1379       sb.append(escapeString(entry.getValue()));
1380       sb.append("\" ");
1381     }
1382     return sb.toString();
1383   }
1384 
escapeString(String data)1385   public static String escapeString(String data) {
1386     return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR,
1387         new char[] {'"', '=', '.'});
1388   }
1389 
1390   public static class InitTransition
1391       implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
1392 
1393     /**
1394      * Note that this transition method is called directly (and synchronously)
1395      * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
1396      * just plain sequential call within AM context), so we can trigger
1397      * modifications in AM state from here (at least, if AM is written that
1398      * way; MR version is).
1399      */
1400     @Override
transition(JobImpl job, JobEvent event)1401     public JobStateInternal transition(JobImpl job, JobEvent event) {
1402       job.metrics.submittedJob(job);
1403       job.metrics.preparingJob(job);
1404 
1405       if (job.newApiCommitter) {
1406         job.jobContext = new JobContextImpl(job.conf,
1407             job.oldJobId);
1408       } else {
1409         job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
1410             job.conf, job.oldJobId);
1411       }
1412 
1413       try {
1414         setup(job);
1415         job.fs = job.getFileSystem(job.conf);
1416 
1417         //log to job history
1418         JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
1419               job.conf.get(MRJobConfig.JOB_NAME, "test"),
1420             job.conf.get(MRJobConfig.USER_NAME, "mapred"),
1421             job.appSubmitTime,
1422             job.remoteJobConfFile.toString(),
1423             job.jobACLs, job.queueName,
1424             job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
1425             job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
1426             job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
1427             getWorkflowAdjacencies(job.conf),
1428             job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
1429         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
1430         //TODO JH Verify jobACLs, UserName via UGI?
1431 
1432         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
1433         job.numMapTasks = taskSplitMetaInfo.length;
1434         job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
1435 
1436         if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
1437           job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
1438         } else if (job.numMapTasks == 0) {
1439           job.reduceWeight = 0.9f;
1440         } else if (job.numReduceTasks == 0) {
1441           job.mapWeight = 0.9f;
1442         } else {
1443           job.mapWeight = job.reduceWeight = 0.45f;
1444         }
1445 
1446         checkTaskLimits();
1447 
1448         long inputLength = 0;
1449         for (int i = 0; i < job.numMapTasks; ++i) {
1450           inputLength += taskSplitMetaInfo[i].getInputDataLength();
1451         }
1452 
1453         job.makeUberDecision(inputLength);
1454 
1455         job.taskAttemptCompletionEvents =
1456             new ArrayList<TaskAttemptCompletionEvent>(
1457                 job.numMapTasks + job.numReduceTasks + 10);
1458         job.mapAttemptCompletionEvents =
1459             new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
1460         job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
1461             job.numMapTasks + job.numReduceTasks + 10);
1462 
1463         job.allowedMapFailuresPercent =
1464             job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
1465         job.allowedReduceFailuresPercent =
1466             job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
1467 
1468         // create the Tasks but don't start them yet
1469         createMapTasks(job, inputLength, taskSplitMetaInfo);
1470         createReduceTasks(job);
1471 
1472         job.metrics.endPreparingJob(job);
1473         return JobStateInternal.INITED;
1474       } catch (Exception e) {
1475         LOG.warn("Job init failed", e);
1476         job.metrics.endPreparingJob(job);
1477         job.addDiagnostic("Job init failed : "
1478             + StringUtils.stringifyException(e));
1479         // Leave job in the NEW state. The MR AM will detect that the state is
1480         // not INITED and send a JOB_INIT_FAILED event.
1481         return JobStateInternal.NEW;
1482       }
1483     }
1484 
setup(JobImpl job)1485     protected void setup(JobImpl job) throws IOException {
1486 
1487       String oldJobIDString = job.oldJobId.toString();
1488       String user =
1489         UserGroupInformation.getCurrentUser().getShortUserName();
1490       Path path = MRApps.getStagingAreaDir(job.conf, user);
1491       if(LOG.isDebugEnabled()) {
1492         LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
1493       }
1494 
1495       job.remoteJobSubmitDir =
1496           FileSystem.get(job.conf).makeQualified(
1497               new Path(path, oldJobIDString));
1498       job.remoteJobConfFile =
1499           new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
1500 
1501       // Prepare the TaskAttemptListener server for authentication of Containers
1502       // TaskAttemptListener gets the information via jobTokenSecretManager.
1503       JobTokenIdentifier identifier =
1504           new JobTokenIdentifier(new Text(oldJobIDString));
1505       job.jobToken =
1506           new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
1507       job.jobToken.setService(identifier.getJobId());
1508       // Add it to the jobTokenSecretManager so that TaskAttemptListener server
1509       // can authenticate containers(tasks)
1510       job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
1511       LOG.info("Adding job token for " + oldJobIDString
1512           + " to jobTokenSecretManager");
1513 
1514       // If the job client did not setup the shuffle secret then reuse
1515       // the job token secret for the shuffle.
1516       if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) {
1517         LOG.warn("Shuffle secret key missing from job credentials."
1518             + " Using job token secret as shuffle secret.");
1519         TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
1520             job.jobCredentials);
1521       }
1522     }
1523 
createMapTasks(JobImpl job, long inputLength, TaskSplitMetaInfo[] splits)1524     private void createMapTasks(JobImpl job, long inputLength,
1525                                 TaskSplitMetaInfo[] splits) {
1526       for (int i=0; i < job.numMapTasks; ++i) {
1527         TaskImpl task =
1528             new MapTaskImpl(job.jobId, i,
1529                 job.eventHandler,
1530                 job.remoteJobConfFile,
1531                 job.conf, splits[i],
1532                 job.taskAttemptListener,
1533                 job.jobToken, job.jobCredentials,
1534                 job.clock,
1535                 job.applicationAttemptId.getAttemptId(),
1536                 job.metrics, job.appContext);
1537         job.addTask(task);
1538       }
1539       LOG.info("Input size for job " + job.jobId + " = " + inputLength
1540           + ". Number of splits = " + splits.length);
1541     }
1542 
createReduceTasks(JobImpl job)1543     private void createReduceTasks(JobImpl job) {
1544       for (int i = 0; i < job.numReduceTasks; i++) {
1545         TaskImpl task =
1546             new ReduceTaskImpl(job.jobId, i,
1547                 job.eventHandler,
1548                 job.remoteJobConfFile,
1549                 job.conf, job.numMapTasks,
1550                 job.taskAttemptListener, job.jobToken,
1551                 job.jobCredentials, job.clock,
1552                 job.applicationAttemptId.getAttemptId(),
1553                 job.metrics, job.appContext);
1554         job.addTask(task);
1555       }
1556       LOG.info("Number of reduces for job " + job.jobId + " = "
1557           + job.numReduceTasks);
1558     }
1559 
createSplits(JobImpl job, JobId jobId)1560     protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
1561       TaskSplitMetaInfo[] allTaskSplitMetaInfo;
1562       try {
1563         allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
1564             job.oldJobId, job.fs,
1565             job.conf,
1566             job.remoteJobSubmitDir);
1567       } catch (IOException e) {
1568         throw new YarnRuntimeException(e);
1569       }
1570       return allTaskSplitMetaInfo;
1571     }
1572 
1573     /**
1574      * If the number of tasks are greater than the configured value
1575      * throw an exception that will fail job initialization
1576      */
checkTaskLimits()1577     private void checkTaskLimits() {
1578       // no code, for now
1579     }
1580   } // end of InitTransition
1581 
1582   private static class InitFailedTransition
1583       implements SingleArcTransition<JobImpl, JobEvent> {
1584     @Override
transition(JobImpl job, JobEvent event)1585     public void transition(JobImpl job, JobEvent event) {
1586         job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
1587                 job.jobContext,
1588                 org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
1589     }
1590   }
1591 
1592   private static class SetupCompletedTransition
1593       implements SingleArcTransition<JobImpl, JobEvent> {
1594     @Override
transition(JobImpl job, JobEvent event)1595     public void transition(JobImpl job, JobEvent event) {
1596       job.setupProgress = 1.0f;
1597       job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
1598       job.scheduleTasks(job.reduceTasks, true);
1599 
1600       // If we have no tasks, just transition to job completed
1601       if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
1602         job.eventHandler.handle(new JobEvent(job.jobId,
1603             JobEventType.JOB_COMPLETED));
1604       }
1605     }
1606   }
1607 
1608   private static class SetupFailedTransition
1609       implements SingleArcTransition<JobImpl, JobEvent> {
1610     @Override
transition(JobImpl job, JobEvent event)1611     public void transition(JobImpl job, JobEvent event) {
1612       job.metrics.endRunningJob(job);
1613       job.addDiagnostic("Job setup failed : "
1614           + ((JobSetupFailedEvent) event).getMessage());
1615       job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
1616           job.jobContext,
1617           org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
1618     }
1619   }
1620 
1621   public static class StartTransition
1622   implements SingleArcTransition<JobImpl, JobEvent> {
1623     /**
1624      * This transition executes in the event-dispatcher thread, though it's
1625      * triggered in MRAppMaster's startJobs() method.
1626      */
1627     @Override
transition(JobImpl job, JobEvent event)1628     public void transition(JobImpl job, JobEvent event) {
1629       JobStartEvent jse = (JobStartEvent) event;
1630       if (jse.getRecoveredJobStartTime() != 0) {
1631         job.startTime = jse.getRecoveredJobStartTime();
1632       } else {
1633         job.startTime = job.clock.getTime();
1634       }
1635       JobInitedEvent jie =
1636         new JobInitedEvent(job.oldJobId,
1637              job.startTime,
1638              job.numMapTasks, job.numReduceTasks,
1639              job.getState().toString(),
1640              job.isUber());
1641       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
1642       JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
1643           job.appSubmitTime, job.startTime);
1644       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
1645       job.metrics.runningJob(job);
1646 
1647       job.eventHandler.handle(new CommitterJobSetupEvent(
1648               job.jobId, job.jobContext));
1649     }
1650   }
1651 
unsuccessfulFinish(JobStateInternal finalState)1652   private void unsuccessfulFinish(JobStateInternal finalState) {
1653       if (finishTime == 0) setFinishTime();
1654       cleanupProgress = 1.0f;
1655       JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
1656           new JobUnsuccessfulCompletionEvent(oldJobId,
1657               finishTime,
1658               succeededMapTaskCount,
1659               succeededReduceTaskCount,
1660               finalState.toString(),
1661               diagnostics);
1662       eventHandler.handle(new JobHistoryEvent(jobId,
1663           unsuccessfulJobEvent));
1664       finished(finalState);
1665   }
1666 
1667   private static class JobAbortCompletedTransition
1668   implements SingleArcTransition<JobImpl, JobEvent> {
1669     @Override
transition(JobImpl job, JobEvent event)1670     public void transition(JobImpl job, JobEvent event) {
1671       JobStateInternal finalState = JobStateInternal.valueOf(
1672           ((JobAbortCompletedEvent) event).getFinalState().name());
1673       job.unsuccessfulFinish(finalState);
1674     }
1675   }
1676 
1677   //This transition happens when a job is to be failed. It waits for all the
1678   //tasks to finish / be killed.
1679   private static class JobFailWaitTransition
1680   implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
1681     @Override
transition(JobImpl job, JobEvent event)1682     public JobStateInternal transition(JobImpl job, JobEvent event) {
1683       if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
1684         for(Task task: job.tasks.values()) {
1685           if(!task.isFinished()) {
1686             return JobStateInternal.FAIL_WAIT;
1687           }
1688         }
1689       }
1690       //Finished waiting. All tasks finished / were killed
1691       job.failWaitTriggerScheduledFuture.cancel(false);
1692       job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
1693         job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
1694       return JobStateInternal.FAIL_ABORT;
1695     }
1696   }
1697 
1698   //This transition happens when a job to be failed times out while waiting on
1699   //tasks that had been sent the KILL signal. It is triggered by a
1700   //ScheduledFuture task queued in the executor.
1701   private static class JobFailWaitTimedOutTransition
1702   implements SingleArcTransition<JobImpl, JobEvent> {
1703     @Override
transition(JobImpl job, JobEvent event)1704     public void transition(JobImpl job, JobEvent event) {
1705       LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
1706         + " Going to fail job anyway");
1707       job.failWaitTriggerScheduledFuture.cancel(false);
1708       job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
1709         job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
1710     }
1711   }
1712 
1713   // JobFinishedEvent triggers the move of the history file out of the staging
1714   // area. May need to create a new event type for this if JobFinished should
1715   // not be generated for KilledJobs, etc.
createJobFinishedEvent(JobImpl job)1716   private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
1717 
1718     job.mayBeConstructFinalFullCounters();
1719 
1720     JobFinishedEvent jfe = new JobFinishedEvent(
1721         job.oldJobId, job.finishTime,
1722         job.succeededMapTaskCount, job.succeededReduceTaskCount,
1723         job.failedMapTaskCount, job.failedReduceTaskCount,
1724         job.finalMapCounters,
1725         job.finalReduceCounters,
1726         job.fullCounters);
1727     return jfe;
1728   }
1729 
mayBeConstructFinalFullCounters()1730   private void mayBeConstructFinalFullCounters() {
1731     // Calculating full-counters. This should happen only once for the job.
1732     synchronized (this.fullCountersLock) {
1733       if (this.fullCounters != null) {
1734         // Already constructed. Just return.
1735         return;
1736       }
1737       this.constructFinalFullcounters();
1738     }
1739   }
1740 
1741   @Private
constructFinalFullcounters()1742   public void constructFinalFullcounters() {
1743     this.fullCounters = new Counters();
1744     this.finalMapCounters = new Counters();
1745     this.finalReduceCounters = new Counters();
1746     this.fullCounters.incrAllCounters(jobCounters);
1747     for (Task t : this.tasks.values()) {
1748       Counters counters = t.getCounters();
1749       switch (t.getType()) {
1750       case MAP:
1751         this.finalMapCounters.incrAllCounters(counters);
1752         break;
1753       case REDUCE:
1754         this.finalReduceCounters.incrAllCounters(counters);
1755         break;
1756       default:
1757         throw new IllegalStateException("Task type neither map nor reduce: " +
1758             t.getType());
1759       }
1760       this.fullCounters.incrAllCounters(counters);
1761     }
1762   }
1763 
1764   // Task-start has been moved out of InitTransition, so this arc simply
1765   // hardcodes 0 for both map and reduce finished tasks.
1766   private static class KillNewJobTransition
1767   implements SingleArcTransition<JobImpl, JobEvent> {
1768     @Override
transition(JobImpl job, JobEvent event)1769     public void transition(JobImpl job, JobEvent event) {
1770       job.setFinishTime();
1771       JobUnsuccessfulCompletionEvent failedEvent =
1772           new JobUnsuccessfulCompletionEvent(job.oldJobId,
1773               job.finishTime, 0, 0,
1774               JobStateInternal.KILLED.toString(), job.diagnostics);
1775       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
1776       job.finished(JobStateInternal.KILLED);
1777     }
1778   }
1779 
1780   private static class KillInitedJobTransition
1781   implements SingleArcTransition<JobImpl, JobEvent> {
1782     @Override
transition(JobImpl job, JobEvent event)1783     public void transition(JobImpl job, JobEvent event) {
1784       job.addDiagnostic("Job received Kill in INITED state.");
1785       job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
1786           job.jobContext,
1787           org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
1788     }
1789   }
1790 
1791   private static class KilledDuringSetupTransition
1792   implements SingleArcTransition<JobImpl, JobEvent> {
1793     @Override
transition(JobImpl job, JobEvent event)1794     public void transition(JobImpl job, JobEvent event) {
1795       job.metrics.endRunningJob(job);
1796       job.addDiagnostic("Job received kill in SETUP state.");
1797       job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
1798           job.jobContext,
1799           org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
1800     }
1801   }
1802 
1803   private static class KillTasksTransition
1804       implements SingleArcTransition<JobImpl, JobEvent> {
1805     @Override
transition(JobImpl job, JobEvent event)1806     public void transition(JobImpl job, JobEvent event) {
1807       job.addDiagnostic(JOB_KILLED_DIAG);
1808       for (Task task : job.tasks.values()) {
1809         job.eventHandler.handle(
1810             new TaskEvent(task.getID(), TaskEventType.T_KILL));
1811       }
1812       job.metrics.endRunningJob(job);
1813     }
1814   }
1815 
1816   private static class TaskAttemptCompletedEventTransition implements
1817       SingleArcTransition<JobImpl, JobEvent> {
1818     @Override
transition(JobImpl job, JobEvent event)1819     public void transition(JobImpl job, JobEvent event) {
1820       TaskAttemptCompletionEvent tce =
1821         ((JobTaskAttemptCompletedEvent) event).getCompletionEvent();
1822       // Add the TaskAttemptCompletionEvent
1823       //eventId is equal to index in the arraylist
1824       tce.setEventId(job.taskAttemptCompletionEvents.size());
1825       job.taskAttemptCompletionEvents.add(tce);
1826       int mapEventIdx = -1;
1827       if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
1828         // we track map completions separately from task completions because
1829         // - getMapAttemptCompletionEvents uses index ranges specific to maps
1830         // - type converting the same events over and over is expensive
1831         mapEventIdx = job.mapAttemptCompletionEvents.size();
1832         job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce));
1833       }
1834       job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx);
1835 
1836       TaskAttemptId attemptId = tce.getAttemptId();
1837       TaskId taskId = attemptId.getTaskId();
1838       //make the previous completion event as obsolete if it exists
1839       Integer successEventNo =
1840           job.successAttemptCompletionEventNoMap.remove(taskId);
1841       if (successEventNo != null) {
1842         TaskAttemptCompletionEvent successEvent =
1843           job.taskAttemptCompletionEvents.get(successEventNo);
1844         successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
1845         int mapCompletionIdx =
1846             job.taskCompletionIdxToMapCompletionIdx.get(successEventNo);
1847         if (mapCompletionIdx >= 0) {
1848           // update the corresponding TaskCompletionEvent for the map
1849           TaskCompletionEvent mapEvent =
1850               job.mapAttemptCompletionEvents.get(mapCompletionIdx);
1851           job.mapAttemptCompletionEvents.set(mapCompletionIdx,
1852               new TaskCompletionEvent(mapEvent.getEventId(),
1853                   mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(),
1854                   mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE,
1855                   mapEvent.getTaskTrackerHttp()));
1856         }
1857       }
1858 
1859       // if this attempt is not successful then why is the previous successful
1860       // attempt being removed above - MAPREDUCE-4330
1861       if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
1862         job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
1863 
1864         // here we could have simply called Task.getSuccessfulAttempt() but
1865         // the event that triggers this code is sent before
1866         // Task.successfulAttempt is set and so there is no guarantee that it
1867         // will be available now
1868         Task task = job.tasks.get(taskId);
1869         TaskAttempt attempt = task.getAttempt(attemptId);
1870         NodeId nodeId = attempt.getNodeId();
1871         assert (nodeId != null); // node must exist for a successful event
1872         List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts
1873             .get(nodeId);
1874         if (taskAttemptIdList == null) {
1875           taskAttemptIdList = new ArrayList<TaskAttemptId>();
1876           job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList);
1877         }
1878         taskAttemptIdList.add(attempt.getID());
1879       }
1880     }
1881   }
1882 
1883   private static class TaskAttemptFetchFailureTransition implements
1884       SingleArcTransition<JobImpl, JobEvent> {
1885     @Override
transition(JobImpl job, JobEvent event)1886     public void transition(JobImpl job, JobEvent event) {
1887       //get number of shuffling reduces
1888       int shufflingReduceTasks = 0;
1889       for (TaskId taskId : job.reduceTasks) {
1890         Task task = job.tasks.get(taskId);
1891         if (TaskState.RUNNING.equals(task.getState())) {
1892           for(TaskAttempt attempt : task.getAttempts().values()) {
1893             if(attempt.getPhase() == Phase.SHUFFLE) {
1894               shufflingReduceTasks++;
1895               break;
1896             }
1897           }
1898         }
1899       }
1900 
1901       JobTaskAttemptFetchFailureEvent fetchfailureEvent =
1902         (JobTaskAttemptFetchFailureEvent) event;
1903       for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId :
1904             fetchfailureEvent.getMaps()) {
1905         Integer fetchFailures = job.fetchFailuresMapping.get(mapId);
1906         fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
1907         job.fetchFailuresMapping.put(mapId, fetchFailures);
1908 
1909         float failureRate = shufflingReduceTasks == 0 ? 1.0f :
1910           (float) fetchFailures / shufflingReduceTasks;
1911         // declare faulty if fetch-failures >= max-allowed-failures
1912         if (fetchFailures >= job.getMaxFetchFailuresNotifications()
1913             && failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
1914           LOG.info("Too many fetch-failures for output of task attempt: " +
1915               mapId + " ... raising fetch failure to map");
1916           job.eventHandler.handle(new TaskAttemptEvent(mapId,
1917               TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
1918           job.fetchFailuresMapping.remove(mapId);
1919         }
1920       }
1921     }
1922   }
1923 
1924   private static class TaskCompletedTransition implements
1925       MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
1926 
1927     @Override
transition(JobImpl job, JobEvent event)1928     public JobStateInternal transition(JobImpl job, JobEvent event) {
1929       job.completedTaskCount++;
1930       LOG.info("Num completed Tasks: " + job.completedTaskCount);
1931       JobTaskEvent taskEvent = (JobTaskEvent) event;
1932       Task task = job.tasks.get(taskEvent.getTaskID());
1933       if (taskEvent.getState() == TaskState.SUCCEEDED) {
1934         taskSucceeded(job, task);
1935       } else if (taskEvent.getState() == TaskState.FAILED) {
1936         taskFailed(job, task);
1937       } else if (taskEvent.getState() == TaskState.KILLED) {
1938         taskKilled(job, task);
1939       }
1940 
1941       return checkJobAfterTaskCompletion(job);
1942     }
1943 
1944     //This class is used to queue a ScheduledFuture to send an event to a job
1945     //after some delay. This can be used to wait for maximum amount of time
1946     //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for
1947     //all tasks to be killed.
1948     static class TriggerScheduledFuture implements Runnable {
1949       JobEvent toSend;
1950       JobImpl job;
TriggerScheduledFuture(JobImpl job, JobEvent toSend)1951       TriggerScheduledFuture(JobImpl job, JobEvent toSend) {
1952         this.toSend = toSend;
1953         this.job = job;
1954       }
run()1955       public void run() {
1956         LOG.info("Sending event " + toSend + " to " + job.getID());
1957         job.getEventHandler().handle(toSend);
1958       }
1959     }
1960 
checkJobAfterTaskCompletion(JobImpl job)1961     protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
1962       //check for Job failure
1963       if (job.failedMapTaskCount*100 >
1964         job.allowedMapFailuresPercent*job.numMapTasks ||
1965         job.failedReduceTaskCount*100 >
1966         job.allowedReduceFailuresPercent*job.numReduceTasks) {
1967         job.setFinishTime();
1968 
1969         String diagnosticMsg = "Job failed as tasks failed. " +
1970             "failedMaps:" + job.failedMapTaskCount +
1971             " failedReduces:" + job.failedReduceTaskCount;
1972         LOG.info(diagnosticMsg);
1973         job.addDiagnostic(diagnosticMsg);
1974 
1975         //Send kill signal to all unfinished tasks here.
1976         boolean allDone = true;
1977         for (Task task : job.tasks.values()) {
1978           if(!task.isFinished()) {
1979             allDone = false;
1980             job.eventHandler.handle(
1981               new TaskEvent(task.getID(), TaskEventType.T_KILL));
1982           }
1983         }
1984 
1985         //If all tasks are already done, we should go directly to FAIL_ABORT
1986         if(allDone) {
1987           job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
1988             job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)
1989           );
1990           return JobStateInternal.FAIL_ABORT;
1991         }
1992 
1993         //Set max timeout to wait for the tasks to get killed
1994         job.failWaitTriggerScheduledFuture = job.executor.schedule(
1995           new TriggerScheduledFuture(job, new JobEvent(job.getID(),
1996             JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt(
1997                 MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
1998                 MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS),
1999                 TimeUnit.MILLISECONDS);
2000         return JobStateInternal.FAIL_WAIT;
2001       }
2002 
2003       return job.checkReadyForCommit();
2004     }
2005 
taskSucceeded(JobImpl job, Task task)2006     private void taskSucceeded(JobImpl job, Task task) {
2007       if (task.getType() == TaskType.MAP) {
2008         job.succeededMapTaskCount++;
2009       } else {
2010         job.succeededReduceTaskCount++;
2011       }
2012       job.metrics.completedTask(task);
2013     }
2014 
taskFailed(JobImpl job, Task task)2015     private void taskFailed(JobImpl job, Task task) {
2016       if (task.getType() == TaskType.MAP) {
2017         job.failedMapTaskCount++;
2018       } else if (task.getType() == TaskType.REDUCE) {
2019         job.failedReduceTaskCount++;
2020       }
2021       job.addDiagnostic("Task failed " + task.getID());
2022       job.metrics.failedTask(task);
2023     }
2024 
taskKilled(JobImpl job, Task task)2025     private void taskKilled(JobImpl job, Task task) {
2026       if (task.getType() == TaskType.MAP) {
2027         job.killedMapTaskCount++;
2028       } else if (task.getType() == TaskType.REDUCE) {
2029         job.killedReduceTaskCount++;
2030       }
2031       job.metrics.killedTask(task);
2032     }
2033   }
2034 
2035   // Transition class for handling jobs with no tasks
2036   private static class JobNoTasksCompletedTransition implements
2037   MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
2038 
2039     @Override
transition(JobImpl job, JobEvent event)2040     public JobStateInternal transition(JobImpl job, JobEvent event) {
2041       return job.checkReadyForCommit();
2042     }
2043   }
2044 
2045   private static class CommitSucceededTransition implements
2046       SingleArcTransition<JobImpl, JobEvent> {
2047     @Override
transition(JobImpl job, JobEvent event)2048     public void transition(JobImpl job, JobEvent event) {
2049       job.logJobHistoryFinishedEvent();
2050       job.finished(JobStateInternal.SUCCEEDED);
2051     }
2052   }
2053 
2054   private static class CommitFailedTransition implements
2055       SingleArcTransition<JobImpl, JobEvent> {
2056     @Override
transition(JobImpl job, JobEvent event)2057     public void transition(JobImpl job, JobEvent event) {
2058       JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event;
2059       job.addDiagnostic("Job commit failed: " + jcfe.getMessage());
2060       job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
2061           job.jobContext,
2062           org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
2063     }
2064   }
2065 
2066   private static class KilledDuringCommitTransition implements
2067       SingleArcTransition<JobImpl, JobEvent> {
2068     @Override
transition(JobImpl job, JobEvent event)2069     public void transition(JobImpl job, JobEvent event) {
2070       job.setFinishTime();
2071       job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
2072           job.jobContext,
2073           org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
2074     }
2075   }
2076 
2077   private static class KilledDuringAbortTransition implements
2078       SingleArcTransition<JobImpl, JobEvent> {
2079     @Override
transition(JobImpl job, JobEvent event)2080     public void transition(JobImpl job, JobEvent event) {
2081       job.unsuccessfulFinish(JobStateInternal.KILLED);
2082     }
2083   }
2084 
2085   private static class MapTaskRescheduledTransition implements
2086       SingleArcTransition<JobImpl, JobEvent> {
2087     @Override
transition(JobImpl job, JobEvent event)2088     public void transition(JobImpl job, JobEvent event) {
2089       //succeeded map task is restarted back
2090       job.completedTaskCount--;
2091       job.succeededMapTaskCount--;
2092     }
2093   }
2094 
2095   private static class KillWaitTaskCompletedTransition extends
2096       TaskCompletedTransition {
2097     @Override
checkJobAfterTaskCompletion(JobImpl job)2098     protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
2099       if (job.completedTaskCount == job.tasks.size()) {
2100         job.setFinishTime();
2101         job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
2102             job.jobContext,
2103             org.apache.hadoop.mapreduce.JobStatus.State.KILLED));
2104         return JobStateInternal.KILL_ABORT;
2105       }
2106       //return the current state, Job not finished yet
2107       return job.getInternalState();
2108     }
2109   }
2110 
addDiagnostic(String diag)2111   protected void addDiagnostic(String diag) {
2112     diagnostics.add(diag);
2113   }
2114 
2115   private static class DiagnosticsUpdateTransition implements
2116       SingleArcTransition<JobImpl, JobEvent> {
2117     @Override
transition(JobImpl job, JobEvent event)2118     public void transition(JobImpl job, JobEvent event) {
2119       job.addDiagnostic(((JobDiagnosticsUpdateEvent) event)
2120           .getDiagnosticUpdate());
2121     }
2122   }
2123 
2124   private static class CounterUpdateTransition implements
2125       SingleArcTransition<JobImpl, JobEvent> {
2126     @Override
transition(JobImpl job, JobEvent event)2127     public void transition(JobImpl job, JobEvent event) {
2128       JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event;
2129       for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce
2130           .getCounterUpdates()) {
2131         job.jobCounters.findCounter(ci.getCounterKey()).increment(
2132           ci.getIncrementValue());
2133       }
2134     }
2135   }
2136 
2137   private static class UpdatedNodesTransition implements
2138       SingleArcTransition<JobImpl, JobEvent> {
2139     @Override
transition(JobImpl job, JobEvent event)2140     public void transition(JobImpl job, JobEvent event) {
2141       JobUpdatedNodesEvent updateEvent = (JobUpdatedNodesEvent) event;
2142       for(NodeReport nr: updateEvent.getUpdatedNodes()) {
2143         NodeState nodeState = nr.getNodeState();
2144         if(nodeState.isUnusable()) {
2145           // act on the updates
2146           job.actOnUnusableNode(nr.getNodeId(), nodeState);
2147         }
2148       }
2149     }
2150   }
2151 
2152   private static class InternalTerminationTransition implements
2153       SingleArcTransition<JobImpl, JobEvent> {
2154     JobStateInternal terminationState = null;
2155     String jobHistoryString = null;
InternalTerminationTransition(JobStateInternal stateInternal, String jobHistoryString)2156     public InternalTerminationTransition(JobStateInternal stateInternal,
2157         String jobHistoryString) {
2158       this.terminationState = stateInternal;
2159       //mostly a hack for jbhistoryserver
2160       this.jobHistoryString = jobHistoryString;
2161     }
2162 
2163     @Override
transition(JobImpl job, JobEvent event)2164     public void transition(JobImpl job, JobEvent event) {
2165       //TODO Is this JH event required.
2166       job.setFinishTime();
2167       JobUnsuccessfulCompletionEvent failedEvent =
2168           new JobUnsuccessfulCompletionEvent(job.oldJobId,
2169               job.finishTime, 0, 0,
2170               jobHistoryString, job.diagnostics);
2171       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
2172       job.finished(terminationState);
2173     }
2174   }
2175 
2176   private static class InternalErrorTransition extends InternalTerminationTransition {
InternalErrorTransition()2177     public InternalErrorTransition(){
2178       super(JobStateInternal.ERROR, JobStateInternal.ERROR.toString());
2179     }
2180   }
2181 
2182   private static class InternalRebootTransition extends InternalTerminationTransition  {
InternalRebootTransition()2183     public InternalRebootTransition(){
2184       super(JobStateInternal.REBOOT, JobStateInternal.ERROR.toString());
2185     }
2186   }
2187 
2188   @Override
loadConfFile()2189   public Configuration loadConfFile() throws IOException {
2190     Path confPath = getConfFile();
2191     FileContext fc = FileContext.getFileContext(confPath.toUri(), conf);
2192     Configuration jobConf = new Configuration(false);
2193     jobConf.addResource(fc.open(confPath), confPath.toString());
2194     return jobConf;
2195   }
2196 
getMaxAllowedFetchFailuresFraction()2197   public float getMaxAllowedFetchFailuresFraction() {
2198     return maxAllowedFetchFailuresFraction;
2199   }
2200 
getMaxFetchFailuresNotifications()2201   public int getMaxFetchFailuresNotifications() {
2202     return maxFetchFailuresNotifications;
2203   }
2204 }
2205