1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.app.job.impl; 20 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.Collection; 24 import java.util.Collections; 25 import java.util.EnumSet; 26 import java.util.HashMap; 27 import java.util.LinkedHashMap; 28 import java.util.LinkedHashSet; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.Map.Entry; 32 import java.util.Set; 33 import java.util.concurrent.ScheduledFuture; 34 import java.util.concurrent.ScheduledThreadPoolExecutor; 35 import java.util.concurrent.ThreadFactory; 36 import java.util.concurrent.TimeUnit; 37 import java.util.concurrent.locks.Lock; 38 import java.util.concurrent.locks.ReadWriteLock; 39 import java.util.concurrent.locks.ReentrantReadWriteLock; 40 41 import org.apache.commons.logging.Log; 42 import org.apache.commons.logging.LogFactory; 43 import org.apache.hadoop.classification.InterfaceAudience.Private; 44 import org.apache.hadoop.conf.Configuration; 45 import org.apache.hadoop.fs.FileContext; 46 import org.apache.hadoop.fs.FileSystem; 47 import org.apache.hadoop.fs.Path; 48 import org.apache.hadoop.io.Text; 49 import org.apache.hadoop.mapred.JobACLsManager; 50 import org.apache.hadoop.mapred.JobConf; 51 import org.apache.hadoop.mapred.TaskCompletionEvent; 52 import org.apache.hadoop.mapreduce.Counters; 53 import org.apache.hadoop.mapreduce.JobACL; 54 import org.apache.hadoop.mapreduce.JobContext; 55 import org.apache.hadoop.mapreduce.MRJobConfig; 56 import org.apache.hadoop.mapreduce.OutputCommitter; 57 import org.apache.hadoop.mapreduce.TypeConverter; 58 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; 59 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; 60 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; 61 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; 62 import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; 63 import org.apache.hadoop.mapreduce.jobhistory.JobQueueChangeEvent; 64 import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; 65 import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; 66 import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; 67 import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; 68 import org.apache.hadoop.mapreduce.security.TokenCache; 69 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; 70 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; 71 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; 72 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; 73 import org.apache.hadoop.mapreduce.task.JobContextImpl; 74 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; 75 import org.apache.hadoop.mapreduce.v2.api.records.JobId; 76 import org.apache.hadoop.mapreduce.v2.api.records.JobReport; 77 import org.apache.hadoop.mapreduce.v2.api.records.JobState; 78 import org.apache.hadoop.mapreduce.v2.api.records.Phase; 79 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; 80 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; 81 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 82 import org.apache.hadoop.mapreduce.v2.api.records.TaskId; 83 import org.apache.hadoop.mapreduce.v2.api.records.TaskState; 84 import org.apache.hadoop.mapreduce.v2.api.records.TaskType; 85 import org.apache.hadoop.mapreduce.v2.app.AppContext; 86 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; 87 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobAbortEvent; 88 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobCommitEvent; 89 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobSetupEvent; 90 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; 91 import org.apache.hadoop.mapreduce.v2.app.job.Task; 92 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; 93 import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent; 94 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent; 95 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; 96 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; 97 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; 98 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; 99 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; 100 import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent; 101 import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; 102 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; 103 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; 104 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; 105 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; 106 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; 107 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; 108 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; 109 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; 110 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; 111 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; 112 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; 113 import org.apache.hadoop.mapreduce.v2.util.MRApps; 114 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; 115 import org.apache.hadoop.security.Credentials; 116 import org.apache.hadoop.security.UserGroupInformation; 117 import org.apache.hadoop.security.authorize.AccessControlList; 118 import org.apache.hadoop.security.token.Token; 119 import org.apache.hadoop.util.StringUtils; 120 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 121 import org.apache.hadoop.yarn.api.records.NodeId; 122 import org.apache.hadoop.yarn.api.records.NodeReport; 123 import org.apache.hadoop.yarn.api.records.NodeState; 124 import org.apache.hadoop.yarn.event.EventHandler; 125 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 126 import org.apache.hadoop.yarn.state.InvalidStateTransitonException; 127 import org.apache.hadoop.yarn.state.MultipleArcTransition; 128 import org.apache.hadoop.yarn.state.SingleArcTransition; 129 import org.apache.hadoop.yarn.state.StateMachine; 130 import org.apache.hadoop.yarn.state.StateMachineFactory; 131 import org.apache.hadoop.yarn.util.Clock; 132 133 import com.google.common.util.concurrent.ThreadFactoryBuilder; 134 135 /** Implementation of Job interface. Maintains the state machines of Job. 136 * The read and write calls use ReadWriteLock for concurrency. 137 */ 138 @SuppressWarnings({ "rawtypes", "unchecked" }) 139 public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, 140 EventHandler<JobEvent> { 141 142 private static final TaskAttemptCompletionEvent[] 143 EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TaskAttemptCompletionEvent[0]; 144 145 private static final TaskCompletionEvent[] 146 EMPTY_TASK_COMPLETION_EVENTS = new TaskCompletionEvent[0]; 147 148 private static final Log LOG = LogFactory.getLog(JobImpl.class); 149 150 //The maximum fraction of fetch failures allowed for a map 151 private float maxAllowedFetchFailuresFraction; 152 153 //Maximum no. of fetch-failure notifications after which map task is failed 154 private int maxFetchFailuresNotifications; 155 156 public static final String JOB_KILLED_DIAG = 157 "Job received Kill while in RUNNING state."; 158 159 //final fields 160 private final ApplicationAttemptId applicationAttemptId; 161 private final Clock clock; 162 private final JobACLsManager aclsManager; 163 private final String username; 164 private final Map<JobACL, AccessControlList> jobACLs; 165 private float setupWeight = 0.05f; 166 private float cleanupWeight = 0.05f; 167 private float mapWeight = 0.0f; 168 private float reduceWeight = 0.0f; 169 private final Map<TaskId, TaskInfo> completedTasksFromPreviousRun; 170 private final List<AMInfo> amInfos; 171 private final Lock readLock; 172 private final Lock writeLock; 173 private final JobId jobId; 174 private final String jobName; 175 private final OutputCommitter committer; 176 private final boolean newApiCommitter; 177 private final org.apache.hadoop.mapreduce.JobID oldJobId; 178 private final TaskAttemptListener taskAttemptListener; 179 private final Object tasksSyncHandle = new Object(); 180 private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>(); 181 private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>(); 182 /** 183 * maps nodes to tasks that have run on those nodes 184 */ 185 private final HashMap<NodeId, List<TaskAttemptId>> 186 nodesToSucceededTaskAttempts = new HashMap<NodeId, List<TaskAttemptId>>(); 187 188 private final EventHandler eventHandler; 189 private final MRAppMetrics metrics; 190 private final String userName; 191 private String queueName; 192 private final long appSubmitTime; 193 private final AppContext appContext; 194 195 private boolean lazyTasksCopyNeeded = false; 196 volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>(); 197 private Counters jobCounters = new Counters(); 198 private Object fullCountersLock = new Object(); 199 private Counters fullCounters = null; 200 private Counters finalMapCounters = null; 201 private Counters finalReduceCounters = null; 202 203 // FIXME: 204 // 205 // Can then replace task-level uber counters (MR-2424) with job-level ones 206 // sent from LocalContainerLauncher, and eventually including a count of 207 // of uber-AM attempts (probably sent from MRAppMaster). 208 public JobConf conf; 209 210 //fields initialized in init 211 private FileSystem fs; 212 private Path remoteJobSubmitDir; 213 public Path remoteJobConfFile; 214 private JobContext jobContext; 215 private int allowedMapFailuresPercent = 0; 216 private int allowedReduceFailuresPercent = 0; 217 private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents; 218 private List<TaskCompletionEvent> mapAttemptCompletionEvents; 219 private List<Integer> taskCompletionIdxToMapCompletionIdx; 220 private final List<String> diagnostics = new ArrayList<String>(); 221 222 //task/attempt related datastructures 223 private final Map<TaskId, Integer> successAttemptCompletionEventNoMap = 224 new HashMap<TaskId, Integer>(); 225 private final Map<TaskAttemptId, Integer> fetchFailuresMapping = 226 new HashMap<TaskAttemptId, Integer>(); 227 228 private static final DiagnosticsUpdateTransition 229 DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); 230 private static final InternalErrorTransition 231 INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); 232 private static final InternalRebootTransition 233 INTERNAL_REBOOT_TRANSITION = new InternalRebootTransition(); 234 private static final TaskAttemptCompletedEventTransition 235 TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION = 236 new TaskAttemptCompletedEventTransition(); 237 private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION = 238 new CounterUpdateTransition(); 239 private static final UpdatedNodesTransition UPDATED_NODES_TRANSITION = 240 new UpdatedNodesTransition(); 241 242 protected static final 243 StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 244 stateMachineFactory 245 = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 246 (JobStateInternal.NEW) 247 248 // Transitions from NEW state 249 .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, 250 JobEventType.JOB_DIAGNOSTIC_UPDATE, 251 DIAGNOSTIC_UPDATE_TRANSITION) 252 .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, 253 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 254 .addTransition 255 (JobStateInternal.NEW, 256 EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW), 257 JobEventType.JOB_INIT, 258 new InitTransition()) 259 .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT, 260 JobEventType.JOB_INIT_FAILED, 261 new InitFailedTransition()) 262 .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED, 263 JobEventType.JOB_KILL, 264 new KillNewJobTransition()) 265 .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR, 266 JobEventType.INTERNAL_ERROR, 267 INTERNAL_ERROR_TRANSITION) 268 .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT, 269 JobEventType.JOB_AM_REBOOT, 270 INTERNAL_REBOOT_TRANSITION) 271 // Ignore-able events 272 .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, 273 JobEventType.JOB_UPDATED_NODES) 274 275 // Transitions from INITED state 276 .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, 277 JobEventType.JOB_DIAGNOSTIC_UPDATE, 278 DIAGNOSTIC_UPDATE_TRANSITION) 279 .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, 280 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 281 .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, 282 JobEventType.JOB_START, 283 new StartTransition()) 284 .addTransition(JobStateInternal.INITED, JobStateInternal.KILLED, 285 JobEventType.JOB_KILL, 286 new KillInitedJobTransition()) 287 .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR, 288 JobEventType.INTERNAL_ERROR, 289 INTERNAL_ERROR_TRANSITION) 290 .addTransition(JobStateInternal.INITED, JobStateInternal.REBOOT, 291 JobEventType.JOB_AM_REBOOT, 292 INTERNAL_REBOOT_TRANSITION) 293 // Ignore-able events 294 .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, 295 JobEventType.JOB_UPDATED_NODES) 296 297 // Transitions from SETUP state 298 .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP, 299 JobEventType.JOB_DIAGNOSTIC_UPDATE, 300 DIAGNOSTIC_UPDATE_TRANSITION) 301 .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP, 302 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 303 .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, 304 JobEventType.JOB_SETUP_COMPLETED, 305 new SetupCompletedTransition()) 306 .addTransition(JobStateInternal.SETUP, JobStateInternal.FAIL_ABORT, 307 JobEventType.JOB_SETUP_FAILED, 308 new SetupFailedTransition()) 309 .addTransition(JobStateInternal.SETUP, JobStateInternal.KILL_ABORT, 310 JobEventType.JOB_KILL, 311 new KilledDuringSetupTransition()) 312 .addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR, 313 JobEventType.INTERNAL_ERROR, 314 INTERNAL_ERROR_TRANSITION) 315 .addTransition(JobStateInternal.SETUP, JobStateInternal.REBOOT, 316 JobEventType.JOB_AM_REBOOT, 317 INTERNAL_REBOOT_TRANSITION) 318 // Ignore-able events 319 .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP, 320 JobEventType.JOB_UPDATED_NODES) 321 322 // Transitions from RUNNING state 323 .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, 324 JobEventType.JOB_TASK_ATTEMPT_COMPLETED, 325 TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) 326 .addTransition 327 (JobStateInternal.RUNNING, 328 EnumSet.of(JobStateInternal.RUNNING, 329 JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT, 330 JobStateInternal.FAIL_ABORT), 331 JobEventType.JOB_TASK_COMPLETED, 332 new TaskCompletedTransition()) 333 .addTransition 334 (JobStateInternal.RUNNING, 335 EnumSet.of(JobStateInternal.RUNNING, 336 JobStateInternal.COMMITTING), 337 JobEventType.JOB_COMPLETED, 338 new JobNoTasksCompletedTransition()) 339 .addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT, 340 JobEventType.JOB_KILL, new KillTasksTransition()) 341 .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, 342 JobEventType.JOB_UPDATED_NODES, 343 UPDATED_NODES_TRANSITION) 344 .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, 345 JobEventType.JOB_MAP_TASK_RESCHEDULED, 346 new MapTaskRescheduledTransition()) 347 .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, 348 JobEventType.JOB_DIAGNOSTIC_UPDATE, 349 DIAGNOSTIC_UPDATE_TRANSITION) 350 .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, 351 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 352 .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING, 353 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 354 new TaskAttemptFetchFailureTransition()) 355 .addTransition( 356 JobStateInternal.RUNNING, 357 JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, 358 INTERNAL_ERROR_TRANSITION) 359 .addTransition(JobStateInternal.RUNNING, JobStateInternal.REBOOT, 360 JobEventType.JOB_AM_REBOOT, 361 INTERNAL_REBOOT_TRANSITION) 362 363 // Transitions from KILL_WAIT state. 364 .addTransition 365 (JobStateInternal.KILL_WAIT, 366 EnumSet.of(JobStateInternal.KILL_WAIT, 367 JobStateInternal.KILL_ABORT), 368 JobEventType.JOB_TASK_COMPLETED, 369 new KillWaitTaskCompletedTransition()) 370 .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT, 371 JobEventType.JOB_TASK_ATTEMPT_COMPLETED, 372 TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) 373 .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT, 374 JobEventType.JOB_DIAGNOSTIC_UPDATE, 375 DIAGNOSTIC_UPDATE_TRANSITION) 376 .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT, 377 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 378 .addTransition( 379 JobStateInternal.KILL_WAIT, 380 JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, 381 INTERNAL_ERROR_TRANSITION) 382 // Ignore-able events 383 .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT, 384 EnumSet.of(JobEventType.JOB_KILL, 385 JobEventType.JOB_UPDATED_NODES, 386 JobEventType.JOB_MAP_TASK_RESCHEDULED, 387 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 388 JobEventType.JOB_AM_REBOOT)) 389 390 // Transitions from COMMITTING state 391 .addTransition(JobStateInternal.COMMITTING, 392 JobStateInternal.SUCCEEDED, 393 JobEventType.JOB_COMMIT_COMPLETED, 394 new CommitSucceededTransition()) 395 .addTransition(JobStateInternal.COMMITTING, 396 JobStateInternal.FAIL_ABORT, 397 JobEventType.JOB_COMMIT_FAILED, 398 new CommitFailedTransition()) 399 .addTransition(JobStateInternal.COMMITTING, 400 JobStateInternal.KILL_ABORT, 401 JobEventType.JOB_KILL, 402 new KilledDuringCommitTransition()) 403 .addTransition(JobStateInternal.COMMITTING, 404 JobStateInternal.COMMITTING, 405 JobEventType.JOB_DIAGNOSTIC_UPDATE, 406 DIAGNOSTIC_UPDATE_TRANSITION) 407 .addTransition(JobStateInternal.COMMITTING, 408 JobStateInternal.COMMITTING, 409 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 410 .addTransition(JobStateInternal.COMMITTING, 411 JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, 412 INTERNAL_ERROR_TRANSITION) 413 .addTransition(JobStateInternal.COMMITTING, JobStateInternal.REBOOT, 414 JobEventType.JOB_AM_REBOOT, 415 INTERNAL_REBOOT_TRANSITION) 416 // Ignore-able events 417 .addTransition(JobStateInternal.COMMITTING, 418 JobStateInternal.COMMITTING, 419 EnumSet.of(JobEventType.JOB_UPDATED_NODES, 420 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) 421 422 // Transitions from SUCCEEDED state 423 .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, 424 JobEventType.JOB_DIAGNOSTIC_UPDATE, 425 DIAGNOSTIC_UPDATE_TRANSITION) 426 .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, 427 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 428 .addTransition( 429 JobStateInternal.SUCCEEDED, 430 JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, 431 INTERNAL_ERROR_TRANSITION) 432 // Ignore-able events 433 .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, 434 EnumSet.of(JobEventType.JOB_KILL, 435 JobEventType.JOB_UPDATED_NODES, 436 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 437 JobEventType.JOB_AM_REBOOT, 438 JobEventType.JOB_TASK_ATTEMPT_COMPLETED, 439 JobEventType.JOB_MAP_TASK_RESCHEDULED)) 440 441 // Transitions from FAIL_WAIT state 442 .addTransition(JobStateInternal.FAIL_WAIT, 443 JobStateInternal.FAIL_WAIT, 444 JobEventType.JOB_DIAGNOSTIC_UPDATE, 445 DIAGNOSTIC_UPDATE_TRANSITION) 446 .addTransition(JobStateInternal.FAIL_WAIT, 447 JobStateInternal.FAIL_WAIT, 448 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 449 .addTransition(JobStateInternal.FAIL_WAIT, 450 EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT), 451 JobEventType.JOB_TASK_COMPLETED, 452 new JobFailWaitTransition()) 453 .addTransition(JobStateInternal.FAIL_WAIT, 454 JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT, 455 new JobFailWaitTimedOutTransition()) 456 .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED, 457 JobEventType.JOB_KILL, 458 new KilledDuringAbortTransition()) 459 .addTransition(JobStateInternal.FAIL_WAIT, 460 JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, 461 INTERNAL_ERROR_TRANSITION) 462 // Ignore-able events 463 .addTransition(JobStateInternal.FAIL_WAIT, 464 JobStateInternal.FAIL_WAIT, 465 EnumSet.of(JobEventType.JOB_UPDATED_NODES, 466 JobEventType.JOB_TASK_ATTEMPT_COMPLETED, 467 JobEventType.JOB_MAP_TASK_RESCHEDULED, 468 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 469 JobEventType.JOB_AM_REBOOT)) 470 471 //Transitions from FAIL_ABORT state 472 .addTransition(JobStateInternal.FAIL_ABORT, 473 JobStateInternal.FAIL_ABORT, 474 JobEventType.JOB_DIAGNOSTIC_UPDATE, 475 DIAGNOSTIC_UPDATE_TRANSITION) 476 .addTransition(JobStateInternal.FAIL_ABORT, 477 JobStateInternal.FAIL_ABORT, 478 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 479 .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAILED, 480 JobEventType.JOB_ABORT_COMPLETED, 481 new JobAbortCompletedTransition()) 482 .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.KILLED, 483 JobEventType.JOB_KILL, 484 new KilledDuringAbortTransition()) 485 .addTransition(JobStateInternal.FAIL_ABORT, 486 JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, 487 INTERNAL_ERROR_TRANSITION) 488 // Ignore-able events 489 .addTransition(JobStateInternal.FAIL_ABORT, 490 JobStateInternal.FAIL_ABORT, 491 EnumSet.of(JobEventType.JOB_UPDATED_NODES, 492 JobEventType.JOB_TASK_COMPLETED, 493 JobEventType.JOB_TASK_ATTEMPT_COMPLETED, 494 JobEventType.JOB_MAP_TASK_RESCHEDULED, 495 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 496 JobEventType.JOB_COMMIT_COMPLETED, 497 JobEventType.JOB_COMMIT_FAILED, 498 JobEventType.JOB_AM_REBOOT, 499 JobEventType.JOB_FAIL_WAIT_TIMEDOUT)) 500 501 // Transitions from KILL_ABORT state 502 .addTransition(JobStateInternal.KILL_ABORT, 503 JobStateInternal.KILL_ABORT, 504 JobEventType.JOB_DIAGNOSTIC_UPDATE, 505 DIAGNOSTIC_UPDATE_TRANSITION) 506 .addTransition(JobStateInternal.KILL_ABORT, 507 JobStateInternal.KILL_ABORT, 508 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 509 .addTransition(JobStateInternal.KILL_ABORT, JobStateInternal.KILLED, 510 JobEventType.JOB_ABORT_COMPLETED, 511 new JobAbortCompletedTransition()) 512 .addTransition(JobStateInternal.KILL_ABORT, JobStateInternal.KILLED, 513 JobEventType.JOB_KILL, 514 new KilledDuringAbortTransition()) 515 .addTransition(JobStateInternal.KILL_ABORT, 516 JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, 517 INTERNAL_ERROR_TRANSITION) 518 // Ignore-able events 519 .addTransition(JobStateInternal.KILL_ABORT, 520 JobStateInternal.KILL_ABORT, 521 EnumSet.of(JobEventType.JOB_UPDATED_NODES, 522 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 523 JobEventType.JOB_SETUP_COMPLETED, 524 JobEventType.JOB_SETUP_FAILED, 525 JobEventType.JOB_COMMIT_COMPLETED, 526 JobEventType.JOB_COMMIT_FAILED, 527 JobEventType.JOB_AM_REBOOT)) 528 529 // Transitions from FAILED state 530 .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, 531 JobEventType.JOB_DIAGNOSTIC_UPDATE, 532 DIAGNOSTIC_UPDATE_TRANSITION) 533 .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, 534 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 535 .addTransition( 536 JobStateInternal.FAILED, 537 JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, 538 INTERNAL_ERROR_TRANSITION) 539 // Ignore-able events 540 .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, 541 EnumSet.of(JobEventType.JOB_KILL, 542 JobEventType.JOB_UPDATED_NODES, 543 JobEventType.JOB_TASK_COMPLETED, 544 JobEventType.JOB_TASK_ATTEMPT_COMPLETED, 545 JobEventType.JOB_MAP_TASK_RESCHEDULED, 546 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 547 JobEventType.JOB_SETUP_COMPLETED, 548 JobEventType.JOB_SETUP_FAILED, 549 JobEventType.JOB_COMMIT_COMPLETED, 550 JobEventType.JOB_COMMIT_FAILED, 551 JobEventType.JOB_ABORT_COMPLETED, 552 JobEventType.JOB_AM_REBOOT)) 553 554 // Transitions from KILLED state 555 .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED, 556 JobEventType.JOB_DIAGNOSTIC_UPDATE, 557 DIAGNOSTIC_UPDATE_TRANSITION) 558 .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED, 559 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 560 .addTransition( 561 JobStateInternal.KILLED, 562 JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, 563 INTERNAL_ERROR_TRANSITION) 564 // Ignore-able events 565 .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED, 566 EnumSet.of(JobEventType.JOB_KILL, 567 JobEventType.JOB_START, 568 JobEventType.JOB_UPDATED_NODES, 569 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 570 JobEventType.JOB_SETUP_COMPLETED, 571 JobEventType.JOB_SETUP_FAILED, 572 JobEventType.JOB_COMMIT_COMPLETED, 573 JobEventType.JOB_COMMIT_FAILED, 574 JobEventType.JOB_ABORT_COMPLETED, 575 JobEventType.JOB_AM_REBOOT)) 576 577 // No transitions from INTERNAL_ERROR state. Ignore all. 578 .addTransition( 579 JobStateInternal.ERROR, 580 JobStateInternal.ERROR, 581 EnumSet.of(JobEventType.JOB_INIT, 582 JobEventType.JOB_KILL, 583 JobEventType.JOB_TASK_COMPLETED, 584 JobEventType.JOB_TASK_ATTEMPT_COMPLETED, 585 JobEventType.JOB_MAP_TASK_RESCHEDULED, 586 JobEventType.JOB_DIAGNOSTIC_UPDATE, 587 JobEventType.JOB_UPDATED_NODES, 588 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 589 JobEventType.JOB_SETUP_COMPLETED, 590 JobEventType.JOB_SETUP_FAILED, 591 JobEventType.JOB_COMMIT_COMPLETED, 592 JobEventType.JOB_COMMIT_FAILED, 593 JobEventType.JOB_ABORT_COMPLETED, 594 JobEventType.INTERNAL_ERROR, 595 JobEventType.JOB_AM_REBOOT)) 596 .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR, 597 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 598 599 // No transitions from AM_REBOOT state. Ignore all. 600 .addTransition( 601 JobStateInternal.REBOOT, 602 JobStateInternal.REBOOT, 603 EnumSet.of(JobEventType.JOB_INIT, 604 JobEventType.JOB_KILL, 605 JobEventType.JOB_TASK_COMPLETED, 606 JobEventType.JOB_TASK_ATTEMPT_COMPLETED, 607 JobEventType.JOB_MAP_TASK_RESCHEDULED, 608 JobEventType.JOB_DIAGNOSTIC_UPDATE, 609 JobEventType.JOB_UPDATED_NODES, 610 JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, 611 JobEventType.JOB_SETUP_COMPLETED, 612 JobEventType.JOB_SETUP_FAILED, 613 JobEventType.JOB_COMMIT_COMPLETED, 614 JobEventType.JOB_COMMIT_FAILED, 615 JobEventType.JOB_ABORT_COMPLETED, 616 JobEventType.INTERNAL_ERROR, 617 JobEventType.JOB_AM_REBOOT)) 618 .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT, 619 JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) 620 621 // create the topology tables 622 .installTopology(); 623 624 private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine; 625 626 //changing fields while the job is running 627 private int numMapTasks; 628 private int numReduceTasks; 629 private int completedTaskCount = 0; 630 private int succeededMapTaskCount = 0; 631 private int succeededReduceTaskCount = 0; 632 private int failedMapTaskCount = 0; 633 private int failedReduceTaskCount = 0; 634 private int killedMapTaskCount = 0; 635 private int killedReduceTaskCount = 0; 636 private long startTime; 637 private long finishTime; 638 private float setupProgress; 639 private float mapProgress; 640 private float reduceProgress; 641 private float cleanupProgress; 642 private boolean isUber = false; 643 644 private Credentials jobCredentials; 645 private Token<JobTokenIdentifier> jobToken; 646 private JobTokenSecretManager jobTokenSecretManager; 647 648 private JobStateInternal forcedState = null; 649 650 //Executor used for running future tasks. 651 private ScheduledThreadPoolExecutor executor; 652 private ScheduledFuture failWaitTriggerScheduledFuture; 653 654 private JobState lastNonFinalState = JobState.NEW; 655 JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, JobTokenSecretManager jobTokenSecretManager, Credentials jobCredentials, Clock clock, Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics, OutputCommitter committer, boolean newApiCommitter, String userName, long appSubmitTime, List<AMInfo> amInfos, AppContext appContext, JobStateInternal forcedState, String forcedDiagnostic)656 public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, 657 Configuration conf, EventHandler eventHandler, 658 TaskAttemptListener taskAttemptListener, 659 JobTokenSecretManager jobTokenSecretManager, 660 Credentials jobCredentials, Clock clock, 661 Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics, 662 OutputCommitter committer, boolean newApiCommitter, String userName, 663 long appSubmitTime, List<AMInfo> amInfos, AppContext appContext, 664 JobStateInternal forcedState, String forcedDiagnostic) { 665 this.applicationAttemptId = applicationAttemptId; 666 this.jobId = jobId; 667 this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>"); 668 this.conf = new JobConf(conf); 669 this.metrics = metrics; 670 this.clock = clock; 671 this.completedTasksFromPreviousRun = completedTasksFromPreviousRun; 672 this.amInfos = amInfos; 673 this.appContext = appContext; 674 this.userName = userName; 675 this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default"); 676 this.appSubmitTime = appSubmitTime; 677 this.oldJobId = TypeConverter.fromYarn(jobId); 678 this.committer = committer; 679 this.newApiCommitter = newApiCommitter; 680 681 this.taskAttemptListener = taskAttemptListener; 682 this.eventHandler = eventHandler; 683 ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); 684 this.readLock = readWriteLock.readLock(); 685 this.writeLock = readWriteLock.writeLock(); 686 687 this.jobCredentials = jobCredentials; 688 this.jobTokenSecretManager = jobTokenSecretManager; 689 690 this.aclsManager = new JobACLsManager(conf); 691 this.username = System.getProperty("user.name"); 692 this.jobACLs = aclsManager.constructJobACLs(conf); 693 694 ThreadFactory threadFactory = new ThreadFactoryBuilder() 695 .setNameFormat("Job Fail Wait Timeout Monitor #%d") 696 .setDaemon(true) 697 .build(); 698 this.executor = new ScheduledThreadPoolExecutor(1, threadFactory); 699 700 // This "this leak" is okay because the retained pointer is in an 701 // instance variable. 702 stateMachine = stateMachineFactory.make(this); 703 this.forcedState = forcedState; 704 if(forcedDiagnostic != null) { 705 this.diagnostics.add(forcedDiagnostic); 706 } 707 708 this.maxAllowedFetchFailuresFraction = conf.getFloat( 709 MRJobConfig.MAX_ALLOWED_FETCH_FAILURES_FRACTION, 710 MRJobConfig.DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION); 711 this.maxFetchFailuresNotifications = conf.getInt( 712 MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, 713 MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); 714 } 715 getStateMachine()716 protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() { 717 return stateMachine; 718 } 719 720 @Override getID()721 public JobId getID() { 722 return jobId; 723 } 724 getEventHandler()725 EventHandler getEventHandler() { 726 return this.eventHandler; 727 } 728 getJobContext()729 JobContext getJobContext() { 730 return this.jobContext; 731 } 732 733 @Override checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)734 public boolean checkAccess(UserGroupInformation callerUGI, 735 JobACL jobOperation) { 736 AccessControlList jobACL = jobACLs.get(jobOperation); 737 if (jobACL == null) { 738 return true; 739 } 740 return aclsManager.checkAccess(callerUGI, jobOperation, userName, jobACL); 741 } 742 743 @Override getTask(TaskId taskID)744 public Task getTask(TaskId taskID) { 745 readLock.lock(); 746 try { 747 return tasks.get(taskID); 748 } finally { 749 readLock.unlock(); 750 } 751 } 752 753 @Override getCompletedMaps()754 public int getCompletedMaps() { 755 readLock.lock(); 756 try { 757 return succeededMapTaskCount + failedMapTaskCount + killedMapTaskCount; 758 } finally { 759 readLock.unlock(); 760 } 761 } 762 763 @Override getCompletedReduces()764 public int getCompletedReduces() { 765 readLock.lock(); 766 try { 767 return succeededReduceTaskCount + failedReduceTaskCount 768 + killedReduceTaskCount; 769 } finally { 770 readLock.unlock(); 771 } 772 } 773 774 @Override isUber()775 public boolean isUber() { 776 return isUber; 777 } 778 779 @Override getAllCounters()780 public Counters getAllCounters() { 781 782 readLock.lock(); 783 784 try { 785 JobStateInternal state = getInternalState(); 786 if (state == JobStateInternal.ERROR || state == JobStateInternal.FAILED 787 || state == JobStateInternal.KILLED || state == JobStateInternal.SUCCEEDED) { 788 this.mayBeConstructFinalFullCounters(); 789 return fullCounters; 790 } 791 792 Counters counters = new Counters(); 793 counters.incrAllCounters(jobCounters); 794 return incrTaskCounters(counters, tasks.values()); 795 796 } finally { 797 readLock.unlock(); 798 } 799 } 800 incrTaskCounters( Counters counters, Collection<Task> tasks)801 public static Counters incrTaskCounters( 802 Counters counters, Collection<Task> tasks) { 803 for (Task task : tasks) { 804 counters.incrAllCounters(task.getCounters()); 805 } 806 return counters; 807 } 808 809 @Override getTaskAttemptCompletionEvents( int fromEventId, int maxEvents)810 public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents( 811 int fromEventId, int maxEvents) { 812 TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS; 813 readLock.lock(); 814 try { 815 if (taskAttemptCompletionEvents.size() > fromEventId) { 816 int actualMax = Math.min(maxEvents, 817 (taskAttemptCompletionEvents.size() - fromEventId)); 818 events = taskAttemptCompletionEvents.subList(fromEventId, 819 actualMax + fromEventId).toArray(events); 820 } 821 return events; 822 } finally { 823 readLock.unlock(); 824 } 825 } 826 827 @Override getMapAttemptCompletionEvents( int startIndex, int maxEvents)828 public TaskCompletionEvent[] getMapAttemptCompletionEvents( 829 int startIndex, int maxEvents) { 830 TaskCompletionEvent[] events = EMPTY_TASK_COMPLETION_EVENTS; 831 readLock.lock(); 832 try { 833 if (mapAttemptCompletionEvents.size() > startIndex) { 834 int actualMax = Math.min(maxEvents, 835 (mapAttemptCompletionEvents.size() - startIndex)); 836 events = mapAttemptCompletionEvents.subList(startIndex, 837 actualMax + startIndex).toArray(events); 838 } 839 return events; 840 } finally { 841 readLock.unlock(); 842 } 843 } 844 845 @Override getDiagnostics()846 public List<String> getDiagnostics() { 847 readLock.lock(); 848 try { 849 return diagnostics; 850 } finally { 851 readLock.unlock(); 852 } 853 } 854 855 @Override getReport()856 public JobReport getReport() { 857 readLock.lock(); 858 try { 859 JobState state = getState(); 860 861 // jobFile can be null if the job is not yet inited. 862 String jobFile = 863 remoteJobConfFile == null ? "" : remoteJobConfFile.toString(); 864 865 StringBuilder diagsb = new StringBuilder(); 866 for (String s : getDiagnostics()) { 867 diagsb.append(s).append("\n"); 868 } 869 870 if (getInternalState() == JobStateInternal.NEW) { 871 return MRBuilderUtils.newJobReport(jobId, jobName, username, state, 872 appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, 873 cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); 874 } 875 876 computeProgress(); 877 JobReport report = MRBuilderUtils.newJobReport(jobId, jobName, username, 878 state, appSubmitTime, startTime, finishTime, setupProgress, 879 this.mapProgress, this.reduceProgress, 880 cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); 881 return report; 882 } finally { 883 readLock.unlock(); 884 } 885 } 886 887 @Override getProgress()888 public float getProgress() { 889 this.readLock.lock(); 890 try { 891 computeProgress(); 892 return (this.setupProgress * this.setupWeight + this.cleanupProgress 893 * this.cleanupWeight + this.mapProgress * this.mapWeight + this.reduceProgress 894 * this.reduceWeight); 895 } finally { 896 this.readLock.unlock(); 897 } 898 } 899 computeProgress()900 private void computeProgress() { 901 this.readLock.lock(); 902 try { 903 float mapProgress = 0f; 904 float reduceProgress = 0f; 905 for (Task task : this.tasks.values()) { 906 if (task.getType() == TaskType.MAP) { 907 mapProgress += (task.isFinished() ? 1f : task.getProgress()); 908 } else { 909 reduceProgress += (task.isFinished() ? 1f : task.getProgress()); 910 } 911 } 912 if (this.numMapTasks != 0) { 913 mapProgress = mapProgress / this.numMapTasks; 914 } 915 if (this.numReduceTasks != 0) { 916 reduceProgress = reduceProgress / this.numReduceTasks; 917 } 918 this.mapProgress = mapProgress; 919 this.reduceProgress = reduceProgress; 920 } finally { 921 this.readLock.unlock(); 922 } 923 } 924 925 @Override getTasks()926 public Map<TaskId, Task> getTasks() { 927 synchronized (tasksSyncHandle) { 928 lazyTasksCopyNeeded = true; 929 return Collections.unmodifiableMap(tasks); 930 } 931 } 932 933 @Override getTasks(TaskType taskType)934 public Map<TaskId,Task> getTasks(TaskType taskType) { 935 Map<TaskId, Task> localTasksCopy = tasks; 936 Map<TaskId, Task> result = new HashMap<TaskId, Task>(); 937 Set<TaskId> tasksOfGivenType = null; 938 readLock.lock(); 939 try { 940 if (TaskType.MAP == taskType) { 941 tasksOfGivenType = mapTasks; 942 } else { 943 tasksOfGivenType = reduceTasks; 944 } 945 for (TaskId taskID : tasksOfGivenType) 946 result.put(taskID, localTasksCopy.get(taskID)); 947 return result; 948 } finally { 949 readLock.unlock(); 950 } 951 } 952 953 @Override getState()954 public JobState getState() { 955 readLock.lock(); 956 try { 957 JobState state = getExternalState(getInternalState()); 958 if (!appContext.hasSuccessfullyUnregistered() 959 && (state == JobState.SUCCEEDED || state == JobState.FAILED 960 || state == JobState.KILLED || state == JobState.ERROR)) { 961 return lastNonFinalState; 962 } else { 963 return state; 964 } 965 } finally { 966 readLock.unlock(); 967 } 968 } 969 scheduleTasks(Set<TaskId> taskIDs, boolean recoverTaskOutput)970 protected void scheduleTasks(Set<TaskId> taskIDs, 971 boolean recoverTaskOutput) { 972 for (TaskId taskID : taskIDs) { 973 TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID); 974 if (taskInfo != null) { 975 eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo, 976 committer, recoverTaskOutput)); 977 } else { 978 eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE)); 979 } 980 } 981 } 982 983 @Override 984 /** 985 * The only entry point to change the Job. 986 */ handle(JobEvent event)987 public void handle(JobEvent event) { 988 if (LOG.isDebugEnabled()) { 989 LOG.debug("Processing " + event.getJobId() + " of type " 990 + event.getType()); 991 } 992 try { 993 writeLock.lock(); 994 JobStateInternal oldState = getInternalState(); 995 try { 996 getStateMachine().doTransition(event.getType(), event); 997 } catch (InvalidStateTransitonException e) { 998 LOG.error("Can't handle this event at current state", e); 999 addDiagnostic("Invalid event " + event.getType() + 1000 " on Job " + this.jobId); 1001 eventHandler.handle(new JobEvent(this.jobId, 1002 JobEventType.INTERNAL_ERROR)); 1003 } 1004 //notify the eventhandler of state change 1005 if (oldState != getInternalState()) { 1006 LOG.info(jobId + "Job Transitioned from " + oldState + " to " 1007 + getInternalState()); 1008 rememberLastNonFinalState(oldState); 1009 } 1010 } 1011 1012 finally { 1013 writeLock.unlock(); 1014 } 1015 } 1016 rememberLastNonFinalState(JobStateInternal stateInternal)1017 private void rememberLastNonFinalState(JobStateInternal stateInternal) { 1018 JobState state = getExternalState(stateInternal); 1019 // if state is not the final state, set lastNonFinalState 1020 if (state != JobState.SUCCEEDED && state != JobState.FAILED 1021 && state != JobState.KILLED && state != JobState.ERROR) { 1022 lastNonFinalState = state; 1023 } 1024 } 1025 1026 @Private getInternalState()1027 public JobStateInternal getInternalState() { 1028 readLock.lock(); 1029 try { 1030 if(forcedState != null) { 1031 return forcedState; 1032 } 1033 return getStateMachine().getCurrentState(); 1034 } finally { 1035 readLock.unlock(); 1036 } 1037 } 1038 getExternalState(JobStateInternal smState)1039 private JobState getExternalState(JobStateInternal smState) { 1040 switch (smState) { 1041 case KILL_WAIT: 1042 case KILL_ABORT: 1043 return JobState.KILLED; 1044 case SETUP: 1045 case COMMITTING: 1046 return JobState.RUNNING; 1047 case FAIL_WAIT: 1048 case FAIL_ABORT: 1049 return JobState.FAILED; 1050 case REBOOT: 1051 if (appContext.isLastAMRetry()) { 1052 return JobState.ERROR; 1053 } else { 1054 // In case of not last retry, return the external state as RUNNING since 1055 // otherwise JobClient will exit when it polls the AM for job state 1056 return JobState.RUNNING; 1057 } 1058 default: 1059 return JobState.valueOf(smState.name()); 1060 } 1061 } 1062 1063 1064 //helpful in testing addTask(Task task)1065 protected void addTask(Task task) { 1066 synchronized (tasksSyncHandle) { 1067 if (lazyTasksCopyNeeded) { 1068 Map<TaskId, Task> newTasks = new LinkedHashMap<TaskId, Task>(); 1069 newTasks.putAll(tasks); 1070 tasks = newTasks; 1071 lazyTasksCopyNeeded = false; 1072 } 1073 } 1074 tasks.put(task.getID(), task); 1075 if (task.getType() == TaskType.MAP) { 1076 mapTasks.add(task.getID()); 1077 } else if (task.getType() == TaskType.REDUCE) { 1078 reduceTasks.add(task.getID()); 1079 } 1080 metrics.waitingTask(task); 1081 } 1082 setFinishTime()1083 void setFinishTime() { 1084 finishTime = clock.getTime(); 1085 } 1086 logJobHistoryFinishedEvent()1087 void logJobHistoryFinishedEvent() { 1088 this.setFinishTime(); 1089 JobFinishedEvent jfe = createJobFinishedEvent(this); 1090 LOG.info("Calling handler for JobFinishedEvent "); 1091 this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe)); 1092 } 1093 1094 /** 1095 * Create the default file System for this job. 1096 * @param conf the conf object 1097 * @return the default filesystem for this job 1098 * @throws IOException 1099 */ getFileSystem(Configuration conf)1100 protected FileSystem getFileSystem(Configuration conf) throws IOException { 1101 return FileSystem.get(conf); 1102 } 1103 checkReadyForCommit()1104 protected JobStateInternal checkReadyForCommit() { 1105 JobStateInternal currentState = getInternalState(); 1106 if (completedTaskCount == tasks.size() 1107 && currentState == JobStateInternal.RUNNING) { 1108 eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext())); 1109 return JobStateInternal.COMMITTING; 1110 } 1111 // return the current state as job not ready to commit yet 1112 return getInternalState(); 1113 } 1114 finished(JobStateInternal finalState)1115 JobStateInternal finished(JobStateInternal finalState) { 1116 if (getInternalState() == JobStateInternal.RUNNING) { 1117 metrics.endRunningJob(this); 1118 } 1119 if (finishTime == 0) setFinishTime(); 1120 eventHandler.handle(new JobFinishEvent(jobId)); 1121 1122 switch (finalState) { 1123 case KILLED: 1124 metrics.killedJob(this); 1125 break; 1126 case REBOOT: 1127 case ERROR: 1128 case FAILED: 1129 metrics.failedJob(this); 1130 break; 1131 case SUCCEEDED: 1132 metrics.completedJob(this); 1133 break; 1134 default: 1135 throw new IllegalArgumentException("Illegal job state: " + finalState); 1136 } 1137 return finalState; 1138 } 1139 1140 @Override getUserName()1141 public String getUserName() { 1142 return userName; 1143 } 1144 1145 @Override getQueueName()1146 public String getQueueName() { 1147 return queueName; 1148 } 1149 1150 @Override setQueueName(String queueName)1151 public void setQueueName(String queueName) { 1152 this.queueName = queueName; 1153 JobQueueChangeEvent jqce = new JobQueueChangeEvent(oldJobId, queueName); 1154 eventHandler.handle(new JobHistoryEvent(jobId, jqce)); 1155 } 1156 1157 /* 1158 * (non-Javadoc) 1159 * @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile() 1160 */ 1161 @Override getConfFile()1162 public Path getConfFile() { 1163 return remoteJobConfFile; 1164 } 1165 1166 @Override getName()1167 public String getName() { 1168 return jobName; 1169 } 1170 1171 @Override getTotalMaps()1172 public int getTotalMaps() { 1173 return mapTasks.size(); //FIXME: why indirection? return numMapTasks... 1174 // unless race? how soon can this get called? 1175 } 1176 1177 @Override getTotalReduces()1178 public int getTotalReduces() { 1179 return reduceTasks.size(); //FIXME: why indirection? return numReduceTasks 1180 } 1181 1182 /* 1183 * (non-Javadoc) 1184 * @see org.apache.hadoop.mapreduce.v2.app.job.Job#getJobACLs() 1185 */ 1186 @Override getJobACLs()1187 public Map<JobACL, AccessControlList> getJobACLs() { 1188 return Collections.unmodifiableMap(jobACLs); 1189 } 1190 1191 @Override getAMInfos()1192 public List<AMInfo> getAMInfos() { 1193 return amInfos; 1194 } 1195 1196 /** 1197 * Decide whether job can be run in uber mode based on various criteria. 1198 * @param dataInputLength Total length for all splits 1199 */ makeUberDecision(long dataInputLength)1200 private void makeUberDecision(long dataInputLength) { 1201 //FIXME: need new memory criterion for uber-decision (oops, too late here; 1202 // until AM-resizing supported, 1203 // must depend on job client to pass fat-slot needs) 1204 // these are no longer "system" settings, necessarily; user may override 1205 int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9); 1206 1207 int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); 1208 1209 long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, 1210 fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from 1211 // [File?]InputFormat and default block size 1212 // from that 1213 1214 long sysMemSizeForUberSlot = 1215 conf.getInt(MRJobConfig.MR_AM_VMEM_MB, 1216 MRJobConfig.DEFAULT_MR_AM_VMEM_MB); 1217 1218 long sysCPUSizeForUberSlot = 1219 conf.getInt(MRJobConfig.MR_AM_CPU_VCORES, 1220 MRJobConfig.DEFAULT_MR_AM_CPU_VCORES); 1221 1222 boolean uberEnabled = 1223 conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); 1224 boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps); 1225 boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces); 1226 boolean smallInput = (dataInputLength <= sysMaxBytes); 1227 // ignoring overhead due to UberAM and statics as negligible here: 1228 long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0); 1229 long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0); 1230 long requiredMB = Math.max(requiredMapMB, requiredReduceMB); 1231 int requiredMapCores = conf.getInt( 1232 MRJobConfig.MAP_CPU_VCORES, 1233 MRJobConfig.DEFAULT_MAP_CPU_VCORES); 1234 int requiredReduceCores = conf.getInt( 1235 MRJobConfig.REDUCE_CPU_VCORES, 1236 MRJobConfig.DEFAULT_REDUCE_CPU_VCORES); 1237 int requiredCores = Math.max(requiredMapCores, requiredReduceCores); 1238 if (numReduceTasks == 0) { 1239 requiredMB = requiredMapMB; 1240 requiredCores = requiredMapCores; 1241 } 1242 boolean smallMemory = 1243 (requiredMB <= sysMemSizeForUberSlot) 1244 || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT); 1245 1246 boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot; 1247 boolean notChainJob = !isChainJob(conf); 1248 1249 // User has overall veto power over uberization, or user can modify 1250 // limits (overriding system settings and potentially shooting 1251 // themselves in the head). Note that ChainMapper/Reducer are 1252 // fundamentally incompatible with MR-1220; they employ a blocking 1253 // queue between the maps/reduces and thus require parallel execution, 1254 // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks 1255 // and thus requires sequential execution. 1256 isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks 1257 && smallInput && smallMemory && smallCpu 1258 && notChainJob; 1259 1260 if (isUber) { 1261 LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+" 1262 + numReduceTasks + "r tasks (" + dataInputLength 1263 + " input bytes) will run sequentially on single node."); 1264 1265 // make sure reduces are scheduled only after all map are completed 1266 conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1267 1.0f); 1268 // uber-subtask attempts all get launched on same node; if one fails, 1269 // probably should retry elsewhere, i.e., move entire uber-AM: ergo, 1270 // limit attempts to 1 (or at most 2? probably not...) 1271 conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1); 1272 conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1); 1273 1274 // disable speculation 1275 conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); 1276 conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); 1277 } else { 1278 StringBuilder msg = new StringBuilder(); 1279 msg.append("Not uberizing ").append(jobId).append(" because:"); 1280 if (!uberEnabled) 1281 msg.append(" not enabled;"); 1282 if (!smallNumMapTasks) 1283 msg.append(" too many maps;"); 1284 if (!smallNumReduceTasks) 1285 msg.append(" too many reduces;"); 1286 if (!smallInput) 1287 msg.append(" too much input;"); 1288 if (!smallCpu) 1289 msg.append(" too much CPU;"); 1290 if (!smallMemory) 1291 msg.append(" too much RAM;"); 1292 if (!notChainJob) 1293 msg.append(" chainjob;"); 1294 LOG.info(msg.toString()); 1295 } 1296 } 1297 1298 /** 1299 * ChainMapper and ChainReducer must execute in parallel, so they're not 1300 * compatible with uberization/LocalContainerLauncher (100% sequential). 1301 */ isChainJob(Configuration conf)1302 private boolean isChainJob(Configuration conf) { 1303 boolean isChainJob = false; 1304 try { 1305 String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR); 1306 if (mapClassName != null) { 1307 Class<?> mapClass = Class.forName(mapClassName); 1308 if (ChainMapper.class.isAssignableFrom(mapClass)) 1309 isChainJob = true; 1310 } 1311 } catch (ClassNotFoundException cnfe) { 1312 // don't care; assume it's not derived from ChainMapper 1313 } catch (NoClassDefFoundError ignored) { 1314 } 1315 try { 1316 String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR); 1317 if (reduceClassName != null) { 1318 Class<?> reduceClass = Class.forName(reduceClassName); 1319 if (ChainReducer.class.isAssignableFrom(reduceClass)) 1320 isChainJob = true; 1321 } 1322 } catch (ClassNotFoundException cnfe) { 1323 // don't care; assume it's not derived from ChainReducer 1324 } catch (NoClassDefFoundError ignored) { 1325 } 1326 return isChainJob; 1327 } 1328 actOnUnusableNode(NodeId nodeId, NodeState nodeState)1329 private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) { 1330 // rerun previously successful map tasks 1331 List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId); 1332 if(taskAttemptIdList != null) { 1333 String mesg = "TaskAttempt killed because it ran on unusable node " 1334 + nodeId; 1335 for(TaskAttemptId id : taskAttemptIdList) { 1336 if(TaskType.MAP == id.getTaskId().getTaskType()) { 1337 // reschedule only map tasks because their outputs maybe unusable 1338 LOG.info(mesg + ". AttemptId:" + id); 1339 eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); 1340 } 1341 } 1342 } 1343 // currently running task attempts on unusable nodes are handled in 1344 // RMContainerAllocator 1345 } 1346 1347 /* 1348 private int getBlockSize() { 1349 String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR); 1350 if (inputClassName != null) { 1351 Class<?> inputClass - Class.forName(inputClassName); 1352 if (FileInputFormat<K, V>) 1353 } 1354 } 1355 */ 1356 /** 1357 * Get the workflow adjacencies from the job conf 1358 * The string returned is of the form "key"="value" "key"="value" ... 1359 */ getWorkflowAdjacencies(Configuration conf)1360 private static String getWorkflowAdjacencies(Configuration conf) { 1361 int prefixLen = MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING.length(); 1362 Map<String,String> adjacencies = 1363 conf.getValByRegex(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN); 1364 if (adjacencies.isEmpty()) { 1365 return ""; 1366 } 1367 int size = 0; 1368 for (Entry<String,String> entry : adjacencies.entrySet()) { 1369 int keyLen = entry.getKey().length(); 1370 size += keyLen - prefixLen; 1371 size += entry.getValue().length() + 6; 1372 } 1373 StringBuilder sb = new StringBuilder(size); 1374 for (Entry<String,String> entry : adjacencies.entrySet()) { 1375 int keyLen = entry.getKey().length(); 1376 sb.append("\""); 1377 sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen))); 1378 sb.append("\"=\""); 1379 sb.append(escapeString(entry.getValue())); 1380 sb.append("\" "); 1381 } 1382 return sb.toString(); 1383 } 1384 escapeString(String data)1385 public static String escapeString(String data) { 1386 return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR, 1387 new char[] {'"', '=', '.'}); 1388 } 1389 1390 public static class InitTransition 1391 implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> { 1392 1393 /** 1394 * Note that this transition method is called directly (and synchronously) 1395 * by MRAppMaster's init() method (i.e., no RPC, no thread-switching; 1396 * just plain sequential call within AM context), so we can trigger 1397 * modifications in AM state from here (at least, if AM is written that 1398 * way; MR version is). 1399 */ 1400 @Override transition(JobImpl job, JobEvent event)1401 public JobStateInternal transition(JobImpl job, JobEvent event) { 1402 job.metrics.submittedJob(job); 1403 job.metrics.preparingJob(job); 1404 1405 if (job.newApiCommitter) { 1406 job.jobContext = new JobContextImpl(job.conf, 1407 job.oldJobId); 1408 } else { 1409 job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( 1410 job.conf, job.oldJobId); 1411 } 1412 1413 try { 1414 setup(job); 1415 job.fs = job.getFileSystem(job.conf); 1416 1417 //log to job history 1418 JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, 1419 job.conf.get(MRJobConfig.JOB_NAME, "test"), 1420 job.conf.get(MRJobConfig.USER_NAME, "mapred"), 1421 job.appSubmitTime, 1422 job.remoteJobConfFile.toString(), 1423 job.jobACLs, job.queueName, 1424 job.conf.get(MRJobConfig.WORKFLOW_ID, ""), 1425 job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), 1426 job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), 1427 getWorkflowAdjacencies(job.conf), 1428 job.conf.get(MRJobConfig.WORKFLOW_TAGS, "")); 1429 job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); 1430 //TODO JH Verify jobACLs, UserName via UGI? 1431 1432 TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId); 1433 job.numMapTasks = taskSplitMetaInfo.length; 1434 job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0); 1435 1436 if (job.numMapTasks == 0 && job.numReduceTasks == 0) { 1437 job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); 1438 } else if (job.numMapTasks == 0) { 1439 job.reduceWeight = 0.9f; 1440 } else if (job.numReduceTasks == 0) { 1441 job.mapWeight = 0.9f; 1442 } else { 1443 job.mapWeight = job.reduceWeight = 0.45f; 1444 } 1445 1446 checkTaskLimits(); 1447 1448 long inputLength = 0; 1449 for (int i = 0; i < job.numMapTasks; ++i) { 1450 inputLength += taskSplitMetaInfo[i].getInputDataLength(); 1451 } 1452 1453 job.makeUberDecision(inputLength); 1454 1455 job.taskAttemptCompletionEvents = 1456 new ArrayList<TaskAttemptCompletionEvent>( 1457 job.numMapTasks + job.numReduceTasks + 10); 1458 job.mapAttemptCompletionEvents = 1459 new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10); 1460 job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>( 1461 job.numMapTasks + job.numReduceTasks + 10); 1462 1463 job.allowedMapFailuresPercent = 1464 job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0); 1465 job.allowedReduceFailuresPercent = 1466 job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0); 1467 1468 // create the Tasks but don't start them yet 1469 createMapTasks(job, inputLength, taskSplitMetaInfo); 1470 createReduceTasks(job); 1471 1472 job.metrics.endPreparingJob(job); 1473 return JobStateInternal.INITED; 1474 } catch (Exception e) { 1475 LOG.warn("Job init failed", e); 1476 job.metrics.endPreparingJob(job); 1477 job.addDiagnostic("Job init failed : " 1478 + StringUtils.stringifyException(e)); 1479 // Leave job in the NEW state. The MR AM will detect that the state is 1480 // not INITED and send a JOB_INIT_FAILED event. 1481 return JobStateInternal.NEW; 1482 } 1483 } 1484 setup(JobImpl job)1485 protected void setup(JobImpl job) throws IOException { 1486 1487 String oldJobIDString = job.oldJobId.toString(); 1488 String user = 1489 UserGroupInformation.getCurrentUser().getShortUserName(); 1490 Path path = MRApps.getStagingAreaDir(job.conf, user); 1491 if(LOG.isDebugEnabled()) { 1492 LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString); 1493 } 1494 1495 job.remoteJobSubmitDir = 1496 FileSystem.get(job.conf).makeQualified( 1497 new Path(path, oldJobIDString)); 1498 job.remoteJobConfFile = 1499 new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); 1500 1501 // Prepare the TaskAttemptListener server for authentication of Containers 1502 // TaskAttemptListener gets the information via jobTokenSecretManager. 1503 JobTokenIdentifier identifier = 1504 new JobTokenIdentifier(new Text(oldJobIDString)); 1505 job.jobToken = 1506 new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager); 1507 job.jobToken.setService(identifier.getJobId()); 1508 // Add it to the jobTokenSecretManager so that TaskAttemptListener server 1509 // can authenticate containers(tasks) 1510 job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken); 1511 LOG.info("Adding job token for " + oldJobIDString 1512 + " to jobTokenSecretManager"); 1513 1514 // If the job client did not setup the shuffle secret then reuse 1515 // the job token secret for the shuffle. 1516 if (TokenCache.getShuffleSecretKey(job.jobCredentials) == null) { 1517 LOG.warn("Shuffle secret key missing from job credentials." 1518 + " Using job token secret as shuffle secret."); 1519 TokenCache.setShuffleSecretKey(job.jobToken.getPassword(), 1520 job.jobCredentials); 1521 } 1522 } 1523 createMapTasks(JobImpl job, long inputLength, TaskSplitMetaInfo[] splits)1524 private void createMapTasks(JobImpl job, long inputLength, 1525 TaskSplitMetaInfo[] splits) { 1526 for (int i=0; i < job.numMapTasks; ++i) { 1527 TaskImpl task = 1528 new MapTaskImpl(job.jobId, i, 1529 job.eventHandler, 1530 job.remoteJobConfFile, 1531 job.conf, splits[i], 1532 job.taskAttemptListener, 1533 job.jobToken, job.jobCredentials, 1534 job.clock, 1535 job.applicationAttemptId.getAttemptId(), 1536 job.metrics, job.appContext); 1537 job.addTask(task); 1538 } 1539 LOG.info("Input size for job " + job.jobId + " = " + inputLength 1540 + ". Number of splits = " + splits.length); 1541 } 1542 createReduceTasks(JobImpl job)1543 private void createReduceTasks(JobImpl job) { 1544 for (int i = 0; i < job.numReduceTasks; i++) { 1545 TaskImpl task = 1546 new ReduceTaskImpl(job.jobId, i, 1547 job.eventHandler, 1548 job.remoteJobConfFile, 1549 job.conf, job.numMapTasks, 1550 job.taskAttemptListener, job.jobToken, 1551 job.jobCredentials, job.clock, 1552 job.applicationAttemptId.getAttemptId(), 1553 job.metrics, job.appContext); 1554 job.addTask(task); 1555 } 1556 LOG.info("Number of reduces for job " + job.jobId + " = " 1557 + job.numReduceTasks); 1558 } 1559 createSplits(JobImpl job, JobId jobId)1560 protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { 1561 TaskSplitMetaInfo[] allTaskSplitMetaInfo; 1562 try { 1563 allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo( 1564 job.oldJobId, job.fs, 1565 job.conf, 1566 job.remoteJobSubmitDir); 1567 } catch (IOException e) { 1568 throw new YarnRuntimeException(e); 1569 } 1570 return allTaskSplitMetaInfo; 1571 } 1572 1573 /** 1574 * If the number of tasks are greater than the configured value 1575 * throw an exception that will fail job initialization 1576 */ checkTaskLimits()1577 private void checkTaskLimits() { 1578 // no code, for now 1579 } 1580 } // end of InitTransition 1581 1582 private static class InitFailedTransition 1583 implements SingleArcTransition<JobImpl, JobEvent> { 1584 @Override transition(JobImpl job, JobEvent event)1585 public void transition(JobImpl job, JobEvent event) { 1586 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 1587 job.jobContext, 1588 org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); 1589 } 1590 } 1591 1592 private static class SetupCompletedTransition 1593 implements SingleArcTransition<JobImpl, JobEvent> { 1594 @Override transition(JobImpl job, JobEvent event)1595 public void transition(JobImpl job, JobEvent event) { 1596 job.setupProgress = 1.0f; 1597 job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0); 1598 job.scheduleTasks(job.reduceTasks, true); 1599 1600 // If we have no tasks, just transition to job completed 1601 if (job.numReduceTasks == 0 && job.numMapTasks == 0) { 1602 job.eventHandler.handle(new JobEvent(job.jobId, 1603 JobEventType.JOB_COMPLETED)); 1604 } 1605 } 1606 } 1607 1608 private static class SetupFailedTransition 1609 implements SingleArcTransition<JobImpl, JobEvent> { 1610 @Override transition(JobImpl job, JobEvent event)1611 public void transition(JobImpl job, JobEvent event) { 1612 job.metrics.endRunningJob(job); 1613 job.addDiagnostic("Job setup failed : " 1614 + ((JobSetupFailedEvent) event).getMessage()); 1615 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 1616 job.jobContext, 1617 org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); 1618 } 1619 } 1620 1621 public static class StartTransition 1622 implements SingleArcTransition<JobImpl, JobEvent> { 1623 /** 1624 * This transition executes in the event-dispatcher thread, though it's 1625 * triggered in MRAppMaster's startJobs() method. 1626 */ 1627 @Override transition(JobImpl job, JobEvent event)1628 public void transition(JobImpl job, JobEvent event) { 1629 JobStartEvent jse = (JobStartEvent) event; 1630 if (jse.getRecoveredJobStartTime() != 0) { 1631 job.startTime = jse.getRecoveredJobStartTime(); 1632 } else { 1633 job.startTime = job.clock.getTime(); 1634 } 1635 JobInitedEvent jie = 1636 new JobInitedEvent(job.oldJobId, 1637 job.startTime, 1638 job.numMapTasks, job.numReduceTasks, 1639 job.getState().toString(), 1640 job.isUber()); 1641 job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie)); 1642 JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId, 1643 job.appSubmitTime, job.startTime); 1644 job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice)); 1645 job.metrics.runningJob(job); 1646 1647 job.eventHandler.handle(new CommitterJobSetupEvent( 1648 job.jobId, job.jobContext)); 1649 } 1650 } 1651 unsuccessfulFinish(JobStateInternal finalState)1652 private void unsuccessfulFinish(JobStateInternal finalState) { 1653 if (finishTime == 0) setFinishTime(); 1654 cleanupProgress = 1.0f; 1655 JobUnsuccessfulCompletionEvent unsuccessfulJobEvent = 1656 new JobUnsuccessfulCompletionEvent(oldJobId, 1657 finishTime, 1658 succeededMapTaskCount, 1659 succeededReduceTaskCount, 1660 finalState.toString(), 1661 diagnostics); 1662 eventHandler.handle(new JobHistoryEvent(jobId, 1663 unsuccessfulJobEvent)); 1664 finished(finalState); 1665 } 1666 1667 private static class JobAbortCompletedTransition 1668 implements SingleArcTransition<JobImpl, JobEvent> { 1669 @Override transition(JobImpl job, JobEvent event)1670 public void transition(JobImpl job, JobEvent event) { 1671 JobStateInternal finalState = JobStateInternal.valueOf( 1672 ((JobAbortCompletedEvent) event).getFinalState().name()); 1673 job.unsuccessfulFinish(finalState); 1674 } 1675 } 1676 1677 //This transition happens when a job is to be failed. It waits for all the 1678 //tasks to finish / be killed. 1679 private static class JobFailWaitTransition 1680 implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> { 1681 @Override transition(JobImpl job, JobEvent event)1682 public JobStateInternal transition(JobImpl job, JobEvent event) { 1683 if(!job.failWaitTriggerScheduledFuture.isCancelled()) { 1684 for(Task task: job.tasks.values()) { 1685 if(!task.isFinished()) { 1686 return JobStateInternal.FAIL_WAIT; 1687 } 1688 } 1689 } 1690 //Finished waiting. All tasks finished / were killed 1691 job.failWaitTriggerScheduledFuture.cancel(false); 1692 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 1693 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); 1694 return JobStateInternal.FAIL_ABORT; 1695 } 1696 } 1697 1698 //This transition happens when a job to be failed times out while waiting on 1699 //tasks that had been sent the KILL signal. It is triggered by a 1700 //ScheduledFuture task queued in the executor. 1701 private static class JobFailWaitTimedOutTransition 1702 implements SingleArcTransition<JobImpl, JobEvent> { 1703 @Override transition(JobImpl job, JobEvent event)1704 public void transition(JobImpl job, JobEvent event) { 1705 LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed." 1706 + " Going to fail job anyway"); 1707 job.failWaitTriggerScheduledFuture.cancel(false); 1708 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 1709 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); 1710 } 1711 } 1712 1713 // JobFinishedEvent triggers the move of the history file out of the staging 1714 // area. May need to create a new event type for this if JobFinished should 1715 // not be generated for KilledJobs, etc. createJobFinishedEvent(JobImpl job)1716 private static JobFinishedEvent createJobFinishedEvent(JobImpl job) { 1717 1718 job.mayBeConstructFinalFullCounters(); 1719 1720 JobFinishedEvent jfe = new JobFinishedEvent( 1721 job.oldJobId, job.finishTime, 1722 job.succeededMapTaskCount, job.succeededReduceTaskCount, 1723 job.failedMapTaskCount, job.failedReduceTaskCount, 1724 job.finalMapCounters, 1725 job.finalReduceCounters, 1726 job.fullCounters); 1727 return jfe; 1728 } 1729 mayBeConstructFinalFullCounters()1730 private void mayBeConstructFinalFullCounters() { 1731 // Calculating full-counters. This should happen only once for the job. 1732 synchronized (this.fullCountersLock) { 1733 if (this.fullCounters != null) { 1734 // Already constructed. Just return. 1735 return; 1736 } 1737 this.constructFinalFullcounters(); 1738 } 1739 } 1740 1741 @Private constructFinalFullcounters()1742 public void constructFinalFullcounters() { 1743 this.fullCounters = new Counters(); 1744 this.finalMapCounters = new Counters(); 1745 this.finalReduceCounters = new Counters(); 1746 this.fullCounters.incrAllCounters(jobCounters); 1747 for (Task t : this.tasks.values()) { 1748 Counters counters = t.getCounters(); 1749 switch (t.getType()) { 1750 case MAP: 1751 this.finalMapCounters.incrAllCounters(counters); 1752 break; 1753 case REDUCE: 1754 this.finalReduceCounters.incrAllCounters(counters); 1755 break; 1756 default: 1757 throw new IllegalStateException("Task type neither map nor reduce: " + 1758 t.getType()); 1759 } 1760 this.fullCounters.incrAllCounters(counters); 1761 } 1762 } 1763 1764 // Task-start has been moved out of InitTransition, so this arc simply 1765 // hardcodes 0 for both map and reduce finished tasks. 1766 private static class KillNewJobTransition 1767 implements SingleArcTransition<JobImpl, JobEvent> { 1768 @Override transition(JobImpl job, JobEvent event)1769 public void transition(JobImpl job, JobEvent event) { 1770 job.setFinishTime(); 1771 JobUnsuccessfulCompletionEvent failedEvent = 1772 new JobUnsuccessfulCompletionEvent(job.oldJobId, 1773 job.finishTime, 0, 0, 1774 JobStateInternal.KILLED.toString(), job.diagnostics); 1775 job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); 1776 job.finished(JobStateInternal.KILLED); 1777 } 1778 } 1779 1780 private static class KillInitedJobTransition 1781 implements SingleArcTransition<JobImpl, JobEvent> { 1782 @Override transition(JobImpl job, JobEvent event)1783 public void transition(JobImpl job, JobEvent event) { 1784 job.addDiagnostic("Job received Kill in INITED state."); 1785 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 1786 job.jobContext, 1787 org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); 1788 } 1789 } 1790 1791 private static class KilledDuringSetupTransition 1792 implements SingleArcTransition<JobImpl, JobEvent> { 1793 @Override transition(JobImpl job, JobEvent event)1794 public void transition(JobImpl job, JobEvent event) { 1795 job.metrics.endRunningJob(job); 1796 job.addDiagnostic("Job received kill in SETUP state."); 1797 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 1798 job.jobContext, 1799 org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); 1800 } 1801 } 1802 1803 private static class KillTasksTransition 1804 implements SingleArcTransition<JobImpl, JobEvent> { 1805 @Override transition(JobImpl job, JobEvent event)1806 public void transition(JobImpl job, JobEvent event) { 1807 job.addDiagnostic(JOB_KILLED_DIAG); 1808 for (Task task : job.tasks.values()) { 1809 job.eventHandler.handle( 1810 new TaskEvent(task.getID(), TaskEventType.T_KILL)); 1811 } 1812 job.metrics.endRunningJob(job); 1813 } 1814 } 1815 1816 private static class TaskAttemptCompletedEventTransition implements 1817 SingleArcTransition<JobImpl, JobEvent> { 1818 @Override transition(JobImpl job, JobEvent event)1819 public void transition(JobImpl job, JobEvent event) { 1820 TaskAttemptCompletionEvent tce = 1821 ((JobTaskAttemptCompletedEvent) event).getCompletionEvent(); 1822 // Add the TaskAttemptCompletionEvent 1823 //eventId is equal to index in the arraylist 1824 tce.setEventId(job.taskAttemptCompletionEvents.size()); 1825 job.taskAttemptCompletionEvents.add(tce); 1826 int mapEventIdx = -1; 1827 if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) { 1828 // we track map completions separately from task completions because 1829 // - getMapAttemptCompletionEvents uses index ranges specific to maps 1830 // - type converting the same events over and over is expensive 1831 mapEventIdx = job.mapAttemptCompletionEvents.size(); 1832 job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce)); 1833 } 1834 job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx); 1835 1836 TaskAttemptId attemptId = tce.getAttemptId(); 1837 TaskId taskId = attemptId.getTaskId(); 1838 //make the previous completion event as obsolete if it exists 1839 Integer successEventNo = 1840 job.successAttemptCompletionEventNoMap.remove(taskId); 1841 if (successEventNo != null) { 1842 TaskAttemptCompletionEvent successEvent = 1843 job.taskAttemptCompletionEvents.get(successEventNo); 1844 successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE); 1845 int mapCompletionIdx = 1846 job.taskCompletionIdxToMapCompletionIdx.get(successEventNo); 1847 if (mapCompletionIdx >= 0) { 1848 // update the corresponding TaskCompletionEvent for the map 1849 TaskCompletionEvent mapEvent = 1850 job.mapAttemptCompletionEvents.get(mapCompletionIdx); 1851 job.mapAttemptCompletionEvents.set(mapCompletionIdx, 1852 new TaskCompletionEvent(mapEvent.getEventId(), 1853 mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(), 1854 mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE, 1855 mapEvent.getTaskTrackerHttp())); 1856 } 1857 } 1858 1859 // if this attempt is not successful then why is the previous successful 1860 // attempt being removed above - MAPREDUCE-4330 1861 if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) { 1862 job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId()); 1863 1864 // here we could have simply called Task.getSuccessfulAttempt() but 1865 // the event that triggers this code is sent before 1866 // Task.successfulAttempt is set and so there is no guarantee that it 1867 // will be available now 1868 Task task = job.tasks.get(taskId); 1869 TaskAttempt attempt = task.getAttempt(attemptId); 1870 NodeId nodeId = attempt.getNodeId(); 1871 assert (nodeId != null); // node must exist for a successful event 1872 List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts 1873 .get(nodeId); 1874 if (taskAttemptIdList == null) { 1875 taskAttemptIdList = new ArrayList<TaskAttemptId>(); 1876 job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList); 1877 } 1878 taskAttemptIdList.add(attempt.getID()); 1879 } 1880 } 1881 } 1882 1883 private static class TaskAttemptFetchFailureTransition implements 1884 SingleArcTransition<JobImpl, JobEvent> { 1885 @Override transition(JobImpl job, JobEvent event)1886 public void transition(JobImpl job, JobEvent event) { 1887 //get number of shuffling reduces 1888 int shufflingReduceTasks = 0; 1889 for (TaskId taskId : job.reduceTasks) { 1890 Task task = job.tasks.get(taskId); 1891 if (TaskState.RUNNING.equals(task.getState())) { 1892 for(TaskAttempt attempt : task.getAttempts().values()) { 1893 if(attempt.getPhase() == Phase.SHUFFLE) { 1894 shufflingReduceTasks++; 1895 break; 1896 } 1897 } 1898 } 1899 } 1900 1901 JobTaskAttemptFetchFailureEvent fetchfailureEvent = 1902 (JobTaskAttemptFetchFailureEvent) event; 1903 for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId : 1904 fetchfailureEvent.getMaps()) { 1905 Integer fetchFailures = job.fetchFailuresMapping.get(mapId); 1906 fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1); 1907 job.fetchFailuresMapping.put(mapId, fetchFailures); 1908 1909 float failureRate = shufflingReduceTasks == 0 ? 1.0f : 1910 (float) fetchFailures / shufflingReduceTasks; 1911 // declare faulty if fetch-failures >= max-allowed-failures 1912 if (fetchFailures >= job.getMaxFetchFailuresNotifications() 1913 && failureRate >= job.getMaxAllowedFetchFailuresFraction()) { 1914 LOG.info("Too many fetch-failures for output of task attempt: " + 1915 mapId + " ... raising fetch failure to map"); 1916 job.eventHandler.handle(new TaskAttemptEvent(mapId, 1917 TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)); 1918 job.fetchFailuresMapping.remove(mapId); 1919 } 1920 } 1921 } 1922 } 1923 1924 private static class TaskCompletedTransition implements 1925 MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> { 1926 1927 @Override transition(JobImpl job, JobEvent event)1928 public JobStateInternal transition(JobImpl job, JobEvent event) { 1929 job.completedTaskCount++; 1930 LOG.info("Num completed Tasks: " + job.completedTaskCount); 1931 JobTaskEvent taskEvent = (JobTaskEvent) event; 1932 Task task = job.tasks.get(taskEvent.getTaskID()); 1933 if (taskEvent.getState() == TaskState.SUCCEEDED) { 1934 taskSucceeded(job, task); 1935 } else if (taskEvent.getState() == TaskState.FAILED) { 1936 taskFailed(job, task); 1937 } else if (taskEvent.getState() == TaskState.KILLED) { 1938 taskKilled(job, task); 1939 } 1940 1941 return checkJobAfterTaskCompletion(job); 1942 } 1943 1944 //This class is used to queue a ScheduledFuture to send an event to a job 1945 //after some delay. This can be used to wait for maximum amount of time 1946 //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for 1947 //all tasks to be killed. 1948 static class TriggerScheduledFuture implements Runnable { 1949 JobEvent toSend; 1950 JobImpl job; TriggerScheduledFuture(JobImpl job, JobEvent toSend)1951 TriggerScheduledFuture(JobImpl job, JobEvent toSend) { 1952 this.toSend = toSend; 1953 this.job = job; 1954 } run()1955 public void run() { 1956 LOG.info("Sending event " + toSend + " to " + job.getID()); 1957 job.getEventHandler().handle(toSend); 1958 } 1959 } 1960 checkJobAfterTaskCompletion(JobImpl job)1961 protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { 1962 //check for Job failure 1963 if (job.failedMapTaskCount*100 > 1964 job.allowedMapFailuresPercent*job.numMapTasks || 1965 job.failedReduceTaskCount*100 > 1966 job.allowedReduceFailuresPercent*job.numReduceTasks) { 1967 job.setFinishTime(); 1968 1969 String diagnosticMsg = "Job failed as tasks failed. " + 1970 "failedMaps:" + job.failedMapTaskCount + 1971 " failedReduces:" + job.failedReduceTaskCount; 1972 LOG.info(diagnosticMsg); 1973 job.addDiagnostic(diagnosticMsg); 1974 1975 //Send kill signal to all unfinished tasks here. 1976 boolean allDone = true; 1977 for (Task task : job.tasks.values()) { 1978 if(!task.isFinished()) { 1979 allDone = false; 1980 job.eventHandler.handle( 1981 new TaskEvent(task.getID(), TaskEventType.T_KILL)); 1982 } 1983 } 1984 1985 //If all tasks are already done, we should go directly to FAIL_ABORT 1986 if(allDone) { 1987 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 1988 job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED) 1989 ); 1990 return JobStateInternal.FAIL_ABORT; 1991 } 1992 1993 //Set max timeout to wait for the tasks to get killed 1994 job.failWaitTriggerScheduledFuture = job.executor.schedule( 1995 new TriggerScheduledFuture(job, new JobEvent(job.getID(), 1996 JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt( 1997 MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, 1998 MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS), 1999 TimeUnit.MILLISECONDS); 2000 return JobStateInternal.FAIL_WAIT; 2001 } 2002 2003 return job.checkReadyForCommit(); 2004 } 2005 taskSucceeded(JobImpl job, Task task)2006 private void taskSucceeded(JobImpl job, Task task) { 2007 if (task.getType() == TaskType.MAP) { 2008 job.succeededMapTaskCount++; 2009 } else { 2010 job.succeededReduceTaskCount++; 2011 } 2012 job.metrics.completedTask(task); 2013 } 2014 taskFailed(JobImpl job, Task task)2015 private void taskFailed(JobImpl job, Task task) { 2016 if (task.getType() == TaskType.MAP) { 2017 job.failedMapTaskCount++; 2018 } else if (task.getType() == TaskType.REDUCE) { 2019 job.failedReduceTaskCount++; 2020 } 2021 job.addDiagnostic("Task failed " + task.getID()); 2022 job.metrics.failedTask(task); 2023 } 2024 taskKilled(JobImpl job, Task task)2025 private void taskKilled(JobImpl job, Task task) { 2026 if (task.getType() == TaskType.MAP) { 2027 job.killedMapTaskCount++; 2028 } else if (task.getType() == TaskType.REDUCE) { 2029 job.killedReduceTaskCount++; 2030 } 2031 job.metrics.killedTask(task); 2032 } 2033 } 2034 2035 // Transition class for handling jobs with no tasks 2036 private static class JobNoTasksCompletedTransition implements 2037 MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> { 2038 2039 @Override transition(JobImpl job, JobEvent event)2040 public JobStateInternal transition(JobImpl job, JobEvent event) { 2041 return job.checkReadyForCommit(); 2042 } 2043 } 2044 2045 private static class CommitSucceededTransition implements 2046 SingleArcTransition<JobImpl, JobEvent> { 2047 @Override transition(JobImpl job, JobEvent event)2048 public void transition(JobImpl job, JobEvent event) { 2049 job.logJobHistoryFinishedEvent(); 2050 job.finished(JobStateInternal.SUCCEEDED); 2051 } 2052 } 2053 2054 private static class CommitFailedTransition implements 2055 SingleArcTransition<JobImpl, JobEvent> { 2056 @Override transition(JobImpl job, JobEvent event)2057 public void transition(JobImpl job, JobEvent event) { 2058 JobCommitFailedEvent jcfe = (JobCommitFailedEvent)event; 2059 job.addDiagnostic("Job commit failed: " + jcfe.getMessage()); 2060 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 2061 job.jobContext, 2062 org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); 2063 } 2064 } 2065 2066 private static class KilledDuringCommitTransition implements 2067 SingleArcTransition<JobImpl, JobEvent> { 2068 @Override transition(JobImpl job, JobEvent event)2069 public void transition(JobImpl job, JobEvent event) { 2070 job.setFinishTime(); 2071 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 2072 job.jobContext, 2073 org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); 2074 } 2075 } 2076 2077 private static class KilledDuringAbortTransition implements 2078 SingleArcTransition<JobImpl, JobEvent> { 2079 @Override transition(JobImpl job, JobEvent event)2080 public void transition(JobImpl job, JobEvent event) { 2081 job.unsuccessfulFinish(JobStateInternal.KILLED); 2082 } 2083 } 2084 2085 private static class MapTaskRescheduledTransition implements 2086 SingleArcTransition<JobImpl, JobEvent> { 2087 @Override transition(JobImpl job, JobEvent event)2088 public void transition(JobImpl job, JobEvent event) { 2089 //succeeded map task is restarted back 2090 job.completedTaskCount--; 2091 job.succeededMapTaskCount--; 2092 } 2093 } 2094 2095 private static class KillWaitTaskCompletedTransition extends 2096 TaskCompletedTransition { 2097 @Override checkJobAfterTaskCompletion(JobImpl job)2098 protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { 2099 if (job.completedTaskCount == job.tasks.size()) { 2100 job.setFinishTime(); 2101 job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, 2102 job.jobContext, 2103 org.apache.hadoop.mapreduce.JobStatus.State.KILLED)); 2104 return JobStateInternal.KILL_ABORT; 2105 } 2106 //return the current state, Job not finished yet 2107 return job.getInternalState(); 2108 } 2109 } 2110 addDiagnostic(String diag)2111 protected void addDiagnostic(String diag) { 2112 diagnostics.add(diag); 2113 } 2114 2115 private static class DiagnosticsUpdateTransition implements 2116 SingleArcTransition<JobImpl, JobEvent> { 2117 @Override transition(JobImpl job, JobEvent event)2118 public void transition(JobImpl job, JobEvent event) { 2119 job.addDiagnostic(((JobDiagnosticsUpdateEvent) event) 2120 .getDiagnosticUpdate()); 2121 } 2122 } 2123 2124 private static class CounterUpdateTransition implements 2125 SingleArcTransition<JobImpl, JobEvent> { 2126 @Override transition(JobImpl job, JobEvent event)2127 public void transition(JobImpl job, JobEvent event) { 2128 JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event; 2129 for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce 2130 .getCounterUpdates()) { 2131 job.jobCounters.findCounter(ci.getCounterKey()).increment( 2132 ci.getIncrementValue()); 2133 } 2134 } 2135 } 2136 2137 private static class UpdatedNodesTransition implements 2138 SingleArcTransition<JobImpl, JobEvent> { 2139 @Override transition(JobImpl job, JobEvent event)2140 public void transition(JobImpl job, JobEvent event) { 2141 JobUpdatedNodesEvent updateEvent = (JobUpdatedNodesEvent) event; 2142 for(NodeReport nr: updateEvent.getUpdatedNodes()) { 2143 NodeState nodeState = nr.getNodeState(); 2144 if(nodeState.isUnusable()) { 2145 // act on the updates 2146 job.actOnUnusableNode(nr.getNodeId(), nodeState); 2147 } 2148 } 2149 } 2150 } 2151 2152 private static class InternalTerminationTransition implements 2153 SingleArcTransition<JobImpl, JobEvent> { 2154 JobStateInternal terminationState = null; 2155 String jobHistoryString = null; InternalTerminationTransition(JobStateInternal stateInternal, String jobHistoryString)2156 public InternalTerminationTransition(JobStateInternal stateInternal, 2157 String jobHistoryString) { 2158 this.terminationState = stateInternal; 2159 //mostly a hack for jbhistoryserver 2160 this.jobHistoryString = jobHistoryString; 2161 } 2162 2163 @Override transition(JobImpl job, JobEvent event)2164 public void transition(JobImpl job, JobEvent event) { 2165 //TODO Is this JH event required. 2166 job.setFinishTime(); 2167 JobUnsuccessfulCompletionEvent failedEvent = 2168 new JobUnsuccessfulCompletionEvent(job.oldJobId, 2169 job.finishTime, 0, 0, 2170 jobHistoryString, job.diagnostics); 2171 job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); 2172 job.finished(terminationState); 2173 } 2174 } 2175 2176 private static class InternalErrorTransition extends InternalTerminationTransition { InternalErrorTransition()2177 public InternalErrorTransition(){ 2178 super(JobStateInternal.ERROR, JobStateInternal.ERROR.toString()); 2179 } 2180 } 2181 2182 private static class InternalRebootTransition extends InternalTerminationTransition { InternalRebootTransition()2183 public InternalRebootTransition(){ 2184 super(JobStateInternal.REBOOT, JobStateInternal.ERROR.toString()); 2185 } 2186 } 2187 2188 @Override loadConfFile()2189 public Configuration loadConfFile() throws IOException { 2190 Path confPath = getConfFile(); 2191 FileContext fc = FileContext.getFileContext(confPath.toUri(), conf); 2192 Configuration jobConf = new Configuration(false); 2193 jobConf.addResource(fc.open(confPath), confPath.toString()); 2194 return jobConf; 2195 } 2196 getMaxAllowedFetchFailuresFraction()2197 public float getMaxAllowedFetchFailuresFraction() { 2198 return maxAllowedFetchFailuresFraction; 2199 } 2200 getMaxFetchFailuresNotifications()2201 public int getMaxFetchFailuresNotifications() { 2202 return maxFetchFailuresNotifications; 2203 } 2204 } 2205