1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapreduce.v2.app.job.impl; 19 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertFalse; 22 import static org.junit.Assert.assertTrue; 23 import static org.junit.Assert.fail; 24 import static org.mockito.Mockito.mock; 25 import static org.mockito.Mockito.when; 26 27 import java.io.IOException; 28 import java.util.ArrayList; 29 import java.util.List; 30 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.hadoop.fs.Path; 34 import org.apache.hadoop.mapred.JobConf; 35 import org.apache.hadoop.mapred.Task; 36 import org.apache.hadoop.mapred.TaskUmbilicalProtocol; 37 import org.apache.hadoop.mapreduce.Counter; 38 import org.apache.hadoop.mapreduce.Counters; 39 import org.apache.hadoop.mapreduce.TaskCounter; 40 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; 41 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; 42 import org.apache.hadoop.mapreduce.v2.api.records.Avataar; 43 import org.apache.hadoop.mapreduce.v2.api.records.JobId; 44 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 45 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; 46 import org.apache.hadoop.mapreduce.v2.api.records.TaskId; 47 import org.apache.hadoop.mapreduce.v2.api.records.TaskState; 48 import org.apache.hadoop.mapreduce.v2.api.records.TaskType; 49 import org.apache.hadoop.mapreduce.v2.app.AppContext; 50 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; 51 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; 52 import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; 53 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; 54 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; 55 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; 56 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; 57 import org.apache.hadoop.security.Credentials; 58 import org.apache.hadoop.security.token.Token; 59 import org.apache.hadoop.yarn.api.records.ApplicationId; 60 import org.apache.hadoop.yarn.event.EventHandler; 61 import org.apache.hadoop.yarn.event.InlineDispatcher; 62 import org.apache.hadoop.yarn.util.Clock; 63 import org.apache.hadoop.yarn.util.Records; 64 import org.apache.hadoop.yarn.util.SystemClock; 65 import org.junit.After; 66 import org.junit.Before; 67 import org.junit.Test; 68 69 @SuppressWarnings("rawtypes") 70 public class TestTaskImpl { 71 72 private static final Log LOG = LogFactory.getLog(TestTaskImpl.class); 73 74 private JobConf conf; 75 private TaskAttemptListener taskAttemptListener; 76 private Token<JobTokenIdentifier> jobToken; 77 private JobId jobId; 78 private Path remoteJobConfFile; 79 private Credentials credentials; 80 private Clock clock; 81 private MRAppMetrics metrics; 82 private TaskImpl mockTask; 83 private ApplicationId appId; 84 private TaskSplitMetaInfo taskSplitMetaInfo; 85 private String[] dataLocations = new String[0]; 86 private AppContext appContext; 87 88 private int startCount = 0; 89 private int taskCounter = 0; 90 private final int partition = 1; 91 92 private InlineDispatcher dispatcher; 93 private List<MockTaskAttemptImpl> taskAttempts; 94 95 private class MockTaskImpl extends TaskImpl { 96 97 private int taskAttemptCounter = 0; 98 TaskType taskType; 99 MockTaskImpl(JobId jobId, int partition, EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, int startCount, MRAppMetrics metrics, AppContext appContext, TaskType taskType)100 public MockTaskImpl(JobId jobId, int partition, 101 EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, 102 TaskAttemptListener taskAttemptListener, 103 Token<JobTokenIdentifier> jobToken, 104 Credentials credentials, Clock clock, int startCount, 105 MRAppMetrics metrics, AppContext appContext, TaskType taskType) { 106 super(jobId, taskType , partition, eventHandler, 107 remoteJobConfFile, conf, taskAttemptListener, 108 jobToken, credentials, clock, 109 startCount, metrics, appContext); 110 this.taskType = taskType; 111 } 112 113 @Override getType()114 public TaskType getType() { 115 return taskType; 116 } 117 118 @Override createAttempt()119 protected TaskAttemptImpl createAttempt() { 120 MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, 121 eventHandler, taskAttemptListener, remoteJobConfFile, partition, 122 conf, jobToken, credentials, clock, appContext, taskType); 123 taskAttempts.add(attempt); 124 return attempt; 125 } 126 127 @Override getMaxAttempts()128 protected int getMaxAttempts() { 129 return 100; 130 } 131 132 @Override internalError(TaskEventType type)133 protected void internalError(TaskEventType type) { 134 super.internalError(type); 135 fail("Internal error: " + type); 136 } 137 } 138 139 private class MockTaskAttemptImpl extends TaskAttemptImpl { 140 141 private float progress = 0; 142 private TaskAttemptState state = TaskAttemptState.NEW; 143 private TaskType taskType; 144 private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS; 145 MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Path jobFile, int partition, JobConf conf, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, AppContext appContext, TaskType taskType)146 public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, 147 TaskAttemptListener taskAttemptListener, Path jobFile, int partition, 148 JobConf conf, Token<JobTokenIdentifier> jobToken, 149 Credentials credentials, Clock clock, 150 AppContext appContext, TaskType taskType) { 151 super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf, 152 dataLocations, jobToken, credentials, clock, appContext); 153 this.taskType = taskType; 154 } 155 getAttemptId()156 public TaskAttemptId getAttemptId() { 157 return getID(); 158 } 159 160 @Override createRemoteTask()161 protected Task createRemoteTask() { 162 return new MockTask(taskType); 163 } 164 getProgress()165 public float getProgress() { 166 return progress ; 167 } 168 setProgress(float progress)169 public void setProgress(float progress) { 170 this.progress = progress; 171 } 172 setState(TaskAttemptState state)173 public void setState(TaskAttemptState state) { 174 this.state = state; 175 } 176 getState()177 public TaskAttemptState getState() { 178 return state; 179 } 180 181 @Override getCounters()182 public Counters getCounters() { 183 return attemptCounters; 184 } 185 setCounters(Counters counters)186 public void setCounters(Counters counters) { 187 attemptCounters = counters; 188 } 189 } 190 191 private class MockTask extends Task { 192 193 private TaskType taskType; MockTask(TaskType taskType)194 MockTask(TaskType taskType) { 195 this.taskType = taskType; 196 } 197 198 @Override run(JobConf job, TaskUmbilicalProtocol umbilical)199 public void run(JobConf job, TaskUmbilicalProtocol umbilical) 200 throws IOException, ClassNotFoundException, InterruptedException { 201 return; 202 } 203 204 @Override isMapTask()205 public boolean isMapTask() { 206 return (taskType == TaskType.MAP); 207 } 208 209 } 210 211 @Before 212 @SuppressWarnings("unchecked") setup()213 public void setup() { 214 dispatcher = new InlineDispatcher(); 215 216 ++startCount; 217 218 conf = new JobConf(); 219 taskAttemptListener = mock(TaskAttemptListener.class); 220 jobToken = (Token<JobTokenIdentifier>) mock(Token.class); 221 remoteJobConfFile = mock(Path.class); 222 credentials = null; 223 clock = new SystemClock(); 224 metrics = mock(MRAppMetrics.class); 225 dataLocations = new String[1]; 226 227 appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); 228 229 jobId = Records.newRecord(JobId.class); 230 jobId.setId(1); 231 jobId.setAppId(appId); 232 appContext = mock(AppContext.class); 233 234 taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); 235 when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); 236 237 taskAttempts = new ArrayList<MockTaskAttemptImpl>(); 238 } 239 createMockTask(TaskType taskType)240 private MockTaskImpl createMockTask(TaskType taskType) { 241 return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), 242 remoteJobConfFile, conf, taskAttemptListener, jobToken, 243 credentials, clock, 244 startCount, metrics, appContext, taskType); 245 } 246 247 @After teardown()248 public void teardown() { 249 taskAttempts.clear(); 250 } 251 getNewTaskID()252 private TaskId getNewTaskID() { 253 TaskId taskId = Records.newRecord(TaskId.class); 254 taskId.setId(++taskCounter); 255 taskId.setJobId(jobId); 256 taskId.setTaskType(mockTask.getType()); 257 return taskId; 258 } 259 scheduleTaskAttempt(TaskId taskId)260 private void scheduleTaskAttempt(TaskId taskId) { 261 mockTask.handle(new TaskEvent(taskId, 262 TaskEventType.T_SCHEDULE)); 263 assertTaskScheduledState(); 264 assertTaskAttemptAvataar(Avataar.VIRGIN); 265 } 266 killTask(TaskId taskId)267 private void killTask(TaskId taskId) { 268 mockTask.handle(new TaskEvent(taskId, 269 TaskEventType.T_KILL)); 270 assertTaskKillWaitState(); 271 } 272 killScheduledTaskAttempt(TaskAttemptId attemptId)273 private void killScheduledTaskAttempt(TaskAttemptId attemptId) { 274 mockTask.handle(new TaskTAttemptEvent(attemptId, 275 TaskEventType.T_ATTEMPT_KILLED)); 276 assertTaskScheduledState(); 277 } 278 launchTaskAttempt(TaskAttemptId attemptId)279 private void launchTaskAttempt(TaskAttemptId attemptId) { 280 mockTask.handle(new TaskTAttemptEvent(attemptId, 281 TaskEventType.T_ATTEMPT_LAUNCHED)); 282 assertTaskRunningState(); 283 } 284 commitTaskAttempt(TaskAttemptId attemptId)285 private void commitTaskAttempt(TaskAttemptId attemptId) { 286 mockTask.handle(new TaskTAttemptEvent(attemptId, 287 TaskEventType.T_ATTEMPT_COMMIT_PENDING)); 288 assertTaskRunningState(); 289 } 290 getLastAttempt()291 private MockTaskAttemptImpl getLastAttempt() { 292 return taskAttempts.get(taskAttempts.size()-1); 293 } 294 updateLastAttemptProgress(float p)295 private void updateLastAttemptProgress(float p) { 296 getLastAttempt().setProgress(p); 297 } 298 updateLastAttemptState(TaskAttemptState s)299 private void updateLastAttemptState(TaskAttemptState s) { 300 getLastAttempt().setState(s); 301 } 302 killRunningTaskAttempt(TaskAttemptId attemptId)303 private void killRunningTaskAttempt(TaskAttemptId attemptId) { 304 mockTask.handle(new TaskTAttemptEvent(attemptId, 305 TaskEventType.T_ATTEMPT_KILLED)); 306 assertTaskRunningState(); 307 } 308 failRunningTaskAttempt(TaskAttemptId attemptId)309 private void failRunningTaskAttempt(TaskAttemptId attemptId) { 310 mockTask.handle(new TaskTAttemptEvent(attemptId, 311 TaskEventType.T_ATTEMPT_FAILED)); 312 assertTaskRunningState(); 313 } 314 315 /** 316 * {@link TaskState#NEW} 317 */ assertTaskNewState()318 private void assertTaskNewState() { 319 assertEquals(TaskState.NEW, mockTask.getState()); 320 } 321 322 /** 323 * {@link TaskState#SCHEDULED} 324 */ assertTaskScheduledState()325 private void assertTaskScheduledState() { 326 assertEquals(TaskState.SCHEDULED, mockTask.getState()); 327 } 328 329 /** 330 * {@link TaskState#RUNNING} 331 */ assertTaskRunningState()332 private void assertTaskRunningState() { 333 assertEquals(TaskState.RUNNING, mockTask.getState()); 334 } 335 336 /** 337 * {@link TaskState#KILL_WAIT} 338 */ assertTaskKillWaitState()339 private void assertTaskKillWaitState() { 340 assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState()); 341 } 342 343 /** 344 * {@link TaskState#SUCCEEDED} 345 */ assertTaskSucceededState()346 private void assertTaskSucceededState() { 347 assertEquals(TaskState.SUCCEEDED, mockTask.getState()); 348 } 349 350 /** 351 * {@link Avataar} 352 */ assertTaskAttemptAvataar(Avataar avataar)353 private void assertTaskAttemptAvataar(Avataar avataar) { 354 for (TaskAttempt taskAttempt : mockTask.getAttempts().values()) { 355 if (((TaskAttemptImpl) taskAttempt).getAvataar() == avataar) { 356 return; 357 } 358 } 359 fail("There is no " + (avataar == Avataar.VIRGIN ? "virgin" : "speculative") 360 + "task attempt"); 361 } 362 363 @Test testInit()364 public void testInit() { 365 LOG.info("--- START: testInit ---"); 366 mockTask = createMockTask(TaskType.MAP); 367 assertTaskNewState(); 368 assert(taskAttempts.size() == 0); 369 } 370 371 @Test 372 /** 373 * {@link TaskState#NEW}->{@link TaskState#SCHEDULED} 374 */ testScheduleTask()375 public void testScheduleTask() { 376 LOG.info("--- START: testScheduleTask ---"); 377 mockTask = createMockTask(TaskType.MAP); 378 TaskId taskId = getNewTaskID(); 379 scheduleTaskAttempt(taskId); 380 } 381 382 @Test 383 /** 384 * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT} 385 */ testKillScheduledTask()386 public void testKillScheduledTask() { 387 LOG.info("--- START: testKillScheduledTask ---"); 388 mockTask = createMockTask(TaskType.MAP); 389 TaskId taskId = getNewTaskID(); 390 scheduleTaskAttempt(taskId); 391 killTask(taskId); 392 } 393 394 @Test 395 /** 396 * Kill attempt 397 * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED} 398 */ testKillScheduledTaskAttempt()399 public void testKillScheduledTaskAttempt() { 400 LOG.info("--- START: testKillScheduledTaskAttempt ---"); 401 mockTask = createMockTask(TaskType.MAP); 402 TaskId taskId = getNewTaskID(); 403 scheduleTaskAttempt(taskId); 404 killScheduledTaskAttempt(getLastAttempt().getAttemptId()); 405 } 406 407 @Test 408 /** 409 * Launch attempt 410 * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING} 411 */ testLaunchTaskAttempt()412 public void testLaunchTaskAttempt() { 413 LOG.info("--- START: testLaunchTaskAttempt ---"); 414 mockTask = createMockTask(TaskType.MAP); 415 TaskId taskId = getNewTaskID(); 416 scheduleTaskAttempt(taskId); 417 launchTaskAttempt(getLastAttempt().getAttemptId()); 418 } 419 420 @Test 421 /** 422 * Kill running attempt 423 * {@link TaskState#RUNNING}->{@link TaskState#RUNNING} 424 */ testKillRunningTaskAttempt()425 public void testKillRunningTaskAttempt() { 426 LOG.info("--- START: testKillRunningTaskAttempt ---"); 427 mockTask = createMockTask(TaskType.MAP); 428 TaskId taskId = getNewTaskID(); 429 scheduleTaskAttempt(taskId); 430 launchTaskAttempt(getLastAttempt().getAttemptId()); 431 killRunningTaskAttempt(getLastAttempt().getAttemptId()); 432 } 433 434 @Test testKillSuccessfulTask()435 public void testKillSuccessfulTask() { 436 LOG.info("--- START: testKillSuccesfulTask ---"); 437 mockTask = createMockTask(TaskType.MAP); 438 TaskId taskId = getNewTaskID(); 439 scheduleTaskAttempt(taskId); 440 launchTaskAttempt(getLastAttempt().getAttemptId()); 441 commitTaskAttempt(getLastAttempt().getAttemptId()); 442 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 443 TaskEventType.T_ATTEMPT_SUCCEEDED)); 444 assertTaskSucceededState(); 445 mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL)); 446 assertTaskSucceededState(); 447 } 448 449 @Test testTaskProgress()450 public void testTaskProgress() { 451 LOG.info("--- START: testTaskProgress ---"); 452 mockTask = createMockTask(TaskType.MAP); 453 454 // launch task 455 TaskId taskId = getNewTaskID(); 456 scheduleTaskAttempt(taskId); 457 float progress = 0f; 458 assert(mockTask.getProgress() == progress); 459 launchTaskAttempt(getLastAttempt().getAttemptId()); 460 461 // update attempt1 462 progress = 50f; 463 updateLastAttemptProgress(progress); 464 assert(mockTask.getProgress() == progress); 465 progress = 100f; 466 updateLastAttemptProgress(progress); 467 assert(mockTask.getProgress() == progress); 468 469 progress = 0f; 470 // mark first attempt as killed 471 updateLastAttemptState(TaskAttemptState.KILLED); 472 assert(mockTask.getProgress() == progress); 473 474 // kill first attempt 475 // should trigger a new attempt 476 // as no successful attempts 477 killRunningTaskAttempt(getLastAttempt().getAttemptId()); 478 assert(taskAttempts.size() == 2); 479 480 assert(mockTask.getProgress() == 0f); 481 launchTaskAttempt(getLastAttempt().getAttemptId()); 482 progress = 50f; 483 updateLastAttemptProgress(progress); 484 assert(mockTask.getProgress() == progress); 485 486 } 487 488 489 @Test testKillDuringTaskAttemptCommit()490 public void testKillDuringTaskAttemptCommit() { 491 mockTask = createMockTask(TaskType.REDUCE); 492 TaskId taskId = getNewTaskID(); 493 scheduleTaskAttempt(taskId); 494 495 launchTaskAttempt(getLastAttempt().getAttemptId()); 496 updateLastAttemptState(TaskAttemptState.COMMIT_PENDING); 497 commitTaskAttempt(getLastAttempt().getAttemptId()); 498 499 TaskAttemptId commitAttempt = getLastAttempt().getAttemptId(); 500 updateLastAttemptState(TaskAttemptState.KILLED); 501 killRunningTaskAttempt(commitAttempt); 502 503 assertFalse(mockTask.canCommit(commitAttempt)); 504 } 505 506 @Test testFailureDuringTaskAttemptCommit()507 public void testFailureDuringTaskAttemptCommit() { 508 mockTask = createMockTask(TaskType.MAP); 509 TaskId taskId = getNewTaskID(); 510 scheduleTaskAttempt(taskId); 511 launchTaskAttempt(getLastAttempt().getAttemptId()); 512 updateLastAttemptState(TaskAttemptState.COMMIT_PENDING); 513 commitTaskAttempt(getLastAttempt().getAttemptId()); 514 515 // During the task attempt commit there is an exception which causes 516 // the attempt to fail 517 updateLastAttemptState(TaskAttemptState.FAILED); 518 failRunningTaskAttempt(getLastAttempt().getAttemptId()); 519 520 assertEquals(2, taskAttempts.size()); 521 updateLastAttemptState(TaskAttemptState.SUCCEEDED); 522 commitTaskAttempt(getLastAttempt().getAttemptId()); 523 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 524 TaskEventType.T_ATTEMPT_SUCCEEDED)); 525 526 assertFalse("First attempt should not commit", 527 mockTask.canCommit(taskAttempts.get(0).getAttemptId())); 528 assertTrue("Second attempt should commit", 529 mockTask.canCommit(getLastAttempt().getAttemptId())); 530 531 assertTaskSucceededState(); 532 } 533 runSpeculativeTaskAttemptSucceeds( TaskEventType firstAttemptFinishEvent)534 private void runSpeculativeTaskAttemptSucceeds( 535 TaskEventType firstAttemptFinishEvent) { 536 TaskId taskId = getNewTaskID(); 537 scheduleTaskAttempt(taskId); 538 launchTaskAttempt(getLastAttempt().getAttemptId()); 539 updateLastAttemptState(TaskAttemptState.RUNNING); 540 541 // Add a speculative task attempt that succeeds 542 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 543 TaskEventType.T_ADD_SPEC_ATTEMPT)); 544 launchTaskAttempt(getLastAttempt().getAttemptId()); 545 commitTaskAttempt(getLastAttempt().getAttemptId()); 546 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 547 TaskEventType.T_ATTEMPT_SUCCEEDED)); 548 549 // The task should now have succeeded 550 assertTaskSucceededState(); 551 552 // Now complete the first task attempt, after the second has succeeded 553 mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), 554 firstAttemptFinishEvent)); 555 556 // The task should still be in the succeeded state 557 assertTaskSucceededState(); 558 559 // The task should contain speculative a task attempt 560 assertTaskAttemptAvataar(Avataar.SPECULATIVE); 561 } 562 563 @Test testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails()564 public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { 565 mockTask = createMockTask(TaskType.MAP); 566 runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED); 567 } 568 569 @Test testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails()570 public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { 571 mockTask = createMockTask(TaskType.REDUCE); 572 runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED); 573 } 574 575 @Test testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled()576 public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() { 577 mockTask = createMockTask(TaskType.MAP); 578 runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED); 579 } 580 581 @Test testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled()582 public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() { 583 mockTask = createMockTask(TaskType.REDUCE); 584 runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED); 585 } 586 587 @Test testMultipleTaskAttemptsSucceed()588 public void testMultipleTaskAttemptsSucceed() { 589 mockTask = createMockTask(TaskType.MAP); 590 runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED); 591 } 592 593 @Test testCommitAfterSucceeds()594 public void testCommitAfterSucceeds() { 595 mockTask = createMockTask(TaskType.REDUCE); 596 runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING); 597 } 598 599 @Test testSpeculativeMapFetchFailure()600 public void testSpeculativeMapFetchFailure() { 601 // Setup a scenario where speculative task wins, first attempt killed 602 mockTask = createMockTask(TaskType.MAP); 603 runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED); 604 assertEquals(2, taskAttempts.size()); 605 606 // speculative attempt retroactively fails from fetch failures 607 mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), 608 TaskEventType.T_ATTEMPT_FAILED)); 609 610 assertTaskScheduledState(); 611 assertEquals(3, taskAttempts.size()); 612 } 613 614 @Test testSpeculativeMapMultipleSucceedFetchFailure()615 public void testSpeculativeMapMultipleSucceedFetchFailure() { 616 // Setup a scenario where speculative task wins, first attempt succeeds 617 mockTask = createMockTask(TaskType.MAP); 618 runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED); 619 assertEquals(2, taskAttempts.size()); 620 621 // speculative attempt retroactively fails from fetch failures 622 mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), 623 TaskEventType.T_ATTEMPT_FAILED)); 624 625 assertTaskScheduledState(); 626 assertEquals(3, taskAttempts.size()); 627 } 628 629 @Test testSpeculativeMapFailedFetchFailure()630 public void testSpeculativeMapFailedFetchFailure() { 631 // Setup a scenario where speculative task wins, first attempt succeeds 632 mockTask = createMockTask(TaskType.MAP); 633 runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED); 634 assertEquals(2, taskAttempts.size()); 635 636 // speculative attempt retroactively fails from fetch failures 637 mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), 638 TaskEventType.T_ATTEMPT_FAILED)); 639 640 assertTaskScheduledState(); 641 assertEquals(3, taskAttempts.size()); 642 } 643 644 @Test testFailedTransitions()645 public void testFailedTransitions() { 646 mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), 647 remoteJobConfFile, conf, taskAttemptListener, jobToken, 648 credentials, clock, startCount, metrics, appContext, TaskType.MAP) { 649 @Override 650 protected int getMaxAttempts() { 651 return 1; 652 } 653 }; 654 TaskId taskId = getNewTaskID(); 655 scheduleTaskAttempt(taskId); 656 launchTaskAttempt(getLastAttempt().getAttemptId()); 657 658 // add three more speculative attempts 659 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 660 TaskEventType.T_ADD_SPEC_ATTEMPT)); 661 launchTaskAttempt(getLastAttempt().getAttemptId()); 662 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 663 TaskEventType.T_ADD_SPEC_ATTEMPT)); 664 launchTaskAttempt(getLastAttempt().getAttemptId()); 665 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 666 TaskEventType.T_ADD_SPEC_ATTEMPT)); 667 launchTaskAttempt(getLastAttempt().getAttemptId()); 668 assertEquals(4, taskAttempts.size()); 669 670 // have the first attempt fail, verify task failed due to no retries 671 MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); 672 taskAttempt.setState(TaskAttemptState.FAILED); 673 mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), 674 TaskEventType.T_ATTEMPT_FAILED)); 675 assertEquals(TaskState.FAILED, mockTask.getState()); 676 677 // verify task can no longer be killed 678 mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL)); 679 assertEquals(TaskState.FAILED, mockTask.getState()); 680 681 // verify speculative doesn't launch new tasks 682 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 683 TaskEventType.T_ADD_SPEC_ATTEMPT)); 684 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 685 TaskEventType.T_ATTEMPT_LAUNCHED)); 686 assertEquals(TaskState.FAILED, mockTask.getState()); 687 assertEquals(4, taskAttempts.size()); 688 689 // verify attempt events from active tasks don't knock task out of FAILED 690 taskAttempt = taskAttempts.get(1); 691 taskAttempt.setState(TaskAttemptState.COMMIT_PENDING); 692 mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), 693 TaskEventType.T_ATTEMPT_COMMIT_PENDING)); 694 assertEquals(TaskState.FAILED, mockTask.getState()); 695 taskAttempt.setState(TaskAttemptState.FAILED); 696 mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), 697 TaskEventType.T_ATTEMPT_FAILED)); 698 assertEquals(TaskState.FAILED, mockTask.getState()); 699 taskAttempt = taskAttempts.get(2); 700 taskAttempt.setState(TaskAttemptState.SUCCEEDED); 701 mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), 702 TaskEventType.T_ATTEMPT_SUCCEEDED)); 703 assertEquals(TaskState.FAILED, mockTask.getState()); 704 taskAttempt = taskAttempts.get(3); 705 taskAttempt.setState(TaskAttemptState.KILLED); 706 mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), 707 TaskEventType.T_ATTEMPT_KILLED)); 708 assertEquals(TaskState.FAILED, mockTask.getState()); 709 } 710 711 @Test testCountersWithSpeculation()712 public void testCountersWithSpeculation() { 713 mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(), 714 remoteJobConfFile, conf, taskAttemptListener, jobToken, 715 credentials, clock, startCount, metrics, appContext, TaskType.MAP) { 716 @Override 717 protected int getMaxAttempts() { 718 return 1; 719 } 720 }; 721 TaskId taskId = getNewTaskID(); 722 scheduleTaskAttempt(taskId); 723 launchTaskAttempt(getLastAttempt().getAttemptId()); 724 updateLastAttemptState(TaskAttemptState.RUNNING); 725 MockTaskAttemptImpl baseAttempt = getLastAttempt(); 726 727 // add a speculative attempt 728 mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 729 TaskEventType.T_ADD_SPEC_ATTEMPT)); 730 launchTaskAttempt(getLastAttempt().getAttemptId()); 731 updateLastAttemptState(TaskAttemptState.RUNNING); 732 MockTaskAttemptImpl specAttempt = getLastAttempt(); 733 assertEquals(2, taskAttempts.size()); 734 735 Counters specAttemptCounters = new Counters(); 736 Counter cpuCounter = specAttemptCounters.findCounter( 737 TaskCounter.CPU_MILLISECONDS); 738 cpuCounter.setValue(1000); 739 specAttempt.setCounters(specAttemptCounters); 740 741 // have the spec attempt succeed but second attempt at 1.0 progress as well 742 commitTaskAttempt(specAttempt.getAttemptId()); 743 specAttempt.setProgress(1.0f); 744 specAttempt.setState(TaskAttemptState.SUCCEEDED); 745 mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(), 746 TaskEventType.T_ATTEMPT_SUCCEEDED)); 747 assertEquals(TaskState.SUCCEEDED, mockTask.getState()); 748 baseAttempt.setProgress(1.0f); 749 750 Counters taskCounters = mockTask.getCounters(); 751 assertEquals("wrong counters for task", specAttemptCounters, taskCounters); 752 } 753 } 754