1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.mapreduce.v2.app.job.impl;
19 
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.when;
26 
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.List;
30 
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.mapred.JobConf;
35 import org.apache.hadoop.mapred.Task;
36 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
37 import org.apache.hadoop.mapreduce.Counter;
38 import org.apache.hadoop.mapreduce.Counters;
39 import org.apache.hadoop.mapreduce.TaskCounter;
40 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
41 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
42 import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
43 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
44 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
45 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
46 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
47 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
48 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
49 import org.apache.hadoop.mapreduce.v2.app.AppContext;
50 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
51 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
52 import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
53 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
54 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
55 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
56 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
57 import org.apache.hadoop.security.Credentials;
58 import org.apache.hadoop.security.token.Token;
59 import org.apache.hadoop.yarn.api.records.ApplicationId;
60 import org.apache.hadoop.yarn.event.EventHandler;
61 import org.apache.hadoop.yarn.event.InlineDispatcher;
62 import org.apache.hadoop.yarn.util.Clock;
63 import org.apache.hadoop.yarn.util.Records;
64 import org.apache.hadoop.yarn.util.SystemClock;
65 import org.junit.After;
66 import org.junit.Before;
67 import org.junit.Test;
68 
69 @SuppressWarnings("rawtypes")
70 public class TestTaskImpl {
71 
72   private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);
73 
74   private JobConf conf;
75   private TaskAttemptListener taskAttemptListener;
76   private Token<JobTokenIdentifier> jobToken;
77   private JobId jobId;
78   private Path remoteJobConfFile;
79   private Credentials credentials;
80   private Clock clock;
81   private MRAppMetrics metrics;
82   private TaskImpl mockTask;
83   private ApplicationId appId;
84   private TaskSplitMetaInfo taskSplitMetaInfo;
85   private String[] dataLocations = new String[0];
86   private AppContext appContext;
87 
88   private int startCount = 0;
89   private int taskCounter = 0;
90   private final int partition = 1;
91 
92   private InlineDispatcher dispatcher;
93   private List<MockTaskAttemptImpl> taskAttempts;
94 
95   private class MockTaskImpl extends TaskImpl {
96 
97     private int taskAttemptCounter = 0;
98     TaskType taskType;
99 
MockTaskImpl(JobId jobId, int partition, EventHandler eventHandler, Path remoteJobConfFile, JobConf conf, TaskAttemptListener taskAttemptListener, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, int startCount, MRAppMetrics metrics, AppContext appContext, TaskType taskType)100     public MockTaskImpl(JobId jobId, int partition,
101         EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
102         TaskAttemptListener taskAttemptListener,
103         Token<JobTokenIdentifier> jobToken,
104         Credentials credentials, Clock clock, int startCount,
105         MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
106       super(jobId, taskType , partition, eventHandler,
107           remoteJobConfFile, conf, taskAttemptListener,
108           jobToken, credentials, clock,
109           startCount, metrics, appContext);
110       this.taskType = taskType;
111     }
112 
113     @Override
getType()114     public TaskType getType() {
115       return taskType;
116     }
117 
118     @Override
createAttempt()119     protected TaskAttemptImpl createAttempt() {
120       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter,
121           eventHandler, taskAttemptListener, remoteJobConfFile, partition,
122           conf, jobToken, credentials, clock, appContext, taskType);
123       taskAttempts.add(attempt);
124       return attempt;
125     }
126 
127     @Override
getMaxAttempts()128     protected int getMaxAttempts() {
129       return 100;
130     }
131 
132     @Override
internalError(TaskEventType type)133     protected void internalError(TaskEventType type) {
134       super.internalError(type);
135       fail("Internal error: " + type);
136     }
137   }
138 
139   private class MockTaskAttemptImpl extends TaskAttemptImpl {
140 
141     private float progress = 0;
142     private TaskAttemptState state = TaskAttemptState.NEW;
143     private TaskType taskType;
144     private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
145 
MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Path jobFile, int partition, JobConf conf, Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock, AppContext appContext, TaskType taskType)146     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
147         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
148         JobConf conf, Token<JobTokenIdentifier> jobToken,
149         Credentials credentials, Clock clock,
150         AppContext appContext, TaskType taskType) {
151       super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
152           dataLocations, jobToken, credentials, clock, appContext);
153       this.taskType = taskType;
154     }
155 
getAttemptId()156     public TaskAttemptId getAttemptId() {
157       return getID();
158     }
159 
160     @Override
createRemoteTask()161     protected Task createRemoteTask() {
162       return new MockTask(taskType);
163     }
164 
getProgress()165     public float getProgress() {
166       return progress ;
167     }
168 
setProgress(float progress)169     public void setProgress(float progress) {
170       this.progress = progress;
171     }
172 
setState(TaskAttemptState state)173     public void setState(TaskAttemptState state) {
174       this.state = state;
175     }
176 
getState()177     public TaskAttemptState getState() {
178       return state;
179     }
180 
181     @Override
getCounters()182     public Counters getCounters() {
183       return attemptCounters;
184     }
185 
setCounters(Counters counters)186     public void setCounters(Counters counters) {
187       attemptCounters = counters;
188     }
189   }
190 
191   private class MockTask extends Task {
192 
193     private TaskType taskType;
MockTask(TaskType taskType)194     MockTask(TaskType taskType) {
195       this.taskType = taskType;
196     }
197 
198     @Override
run(JobConf job, TaskUmbilicalProtocol umbilical)199     public void run(JobConf job, TaskUmbilicalProtocol umbilical)
200         throws IOException, ClassNotFoundException, InterruptedException {
201       return;
202     }
203 
204     @Override
isMapTask()205     public boolean isMapTask() {
206       return (taskType == TaskType.MAP);
207     }
208 
209   }
210 
211   @Before
212   @SuppressWarnings("unchecked")
setup()213   public void setup() {
214      dispatcher = new InlineDispatcher();
215 
216     ++startCount;
217 
218     conf = new JobConf();
219     taskAttemptListener = mock(TaskAttemptListener.class);
220     jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
221     remoteJobConfFile = mock(Path.class);
222     credentials = null;
223     clock = new SystemClock();
224     metrics = mock(MRAppMetrics.class);
225     dataLocations = new String[1];
226 
227     appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
228 
229     jobId = Records.newRecord(JobId.class);
230     jobId.setId(1);
231     jobId.setAppId(appId);
232     appContext = mock(AppContext.class);
233 
234     taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
235     when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations);
236 
237     taskAttempts = new ArrayList<MockTaskAttemptImpl>();
238   }
239 
createMockTask(TaskType taskType)240   private MockTaskImpl createMockTask(TaskType taskType) {
241     return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
242         remoteJobConfFile, conf, taskAttemptListener, jobToken,
243         credentials, clock,
244         startCount, metrics, appContext, taskType);
245   }
246 
247   @After
teardown()248   public void teardown() {
249     taskAttempts.clear();
250   }
251 
getNewTaskID()252   private TaskId getNewTaskID() {
253     TaskId taskId = Records.newRecord(TaskId.class);
254     taskId.setId(++taskCounter);
255     taskId.setJobId(jobId);
256     taskId.setTaskType(mockTask.getType());
257     return taskId;
258   }
259 
scheduleTaskAttempt(TaskId taskId)260   private void scheduleTaskAttempt(TaskId taskId) {
261     mockTask.handle(new TaskEvent(taskId,
262         TaskEventType.T_SCHEDULE));
263     assertTaskScheduledState();
264     assertTaskAttemptAvataar(Avataar.VIRGIN);
265   }
266 
killTask(TaskId taskId)267   private void killTask(TaskId taskId) {
268     mockTask.handle(new TaskEvent(taskId,
269         TaskEventType.T_KILL));
270     assertTaskKillWaitState();
271   }
272 
killScheduledTaskAttempt(TaskAttemptId attemptId)273   private void killScheduledTaskAttempt(TaskAttemptId attemptId) {
274     mockTask.handle(new TaskTAttemptEvent(attemptId,
275         TaskEventType.T_ATTEMPT_KILLED));
276     assertTaskScheduledState();
277   }
278 
launchTaskAttempt(TaskAttemptId attemptId)279   private void launchTaskAttempt(TaskAttemptId attemptId) {
280     mockTask.handle(new TaskTAttemptEvent(attemptId,
281         TaskEventType.T_ATTEMPT_LAUNCHED));
282     assertTaskRunningState();
283   }
284 
commitTaskAttempt(TaskAttemptId attemptId)285   private void commitTaskAttempt(TaskAttemptId attemptId) {
286     mockTask.handle(new TaskTAttemptEvent(attemptId,
287         TaskEventType.T_ATTEMPT_COMMIT_PENDING));
288     assertTaskRunningState();
289   }
290 
getLastAttempt()291   private MockTaskAttemptImpl getLastAttempt() {
292     return taskAttempts.get(taskAttempts.size()-1);
293   }
294 
updateLastAttemptProgress(float p)295   private void updateLastAttemptProgress(float p) {
296     getLastAttempt().setProgress(p);
297   }
298 
updateLastAttemptState(TaskAttemptState s)299   private void updateLastAttemptState(TaskAttemptState s) {
300     getLastAttempt().setState(s);
301   }
302 
killRunningTaskAttempt(TaskAttemptId attemptId)303   private void killRunningTaskAttempt(TaskAttemptId attemptId) {
304     mockTask.handle(new TaskTAttemptEvent(attemptId,
305         TaskEventType.T_ATTEMPT_KILLED));
306     assertTaskRunningState();
307   }
308 
failRunningTaskAttempt(TaskAttemptId attemptId)309   private void failRunningTaskAttempt(TaskAttemptId attemptId) {
310     mockTask.handle(new TaskTAttemptEvent(attemptId,
311         TaskEventType.T_ATTEMPT_FAILED));
312     assertTaskRunningState();
313   }
314 
315   /**
316    * {@link TaskState#NEW}
317    */
assertTaskNewState()318   private void assertTaskNewState() {
319     assertEquals(TaskState.NEW, mockTask.getState());
320   }
321 
322   /**
323    * {@link TaskState#SCHEDULED}
324    */
assertTaskScheduledState()325   private void assertTaskScheduledState() {
326     assertEquals(TaskState.SCHEDULED, mockTask.getState());
327   }
328 
329   /**
330    * {@link TaskState#RUNNING}
331    */
assertTaskRunningState()332   private void assertTaskRunningState() {
333     assertEquals(TaskState.RUNNING, mockTask.getState());
334   }
335 
336   /**
337    * {@link TaskState#KILL_WAIT}
338    */
assertTaskKillWaitState()339   private void assertTaskKillWaitState() {
340     assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
341   }
342 
343   /**
344    * {@link TaskState#SUCCEEDED}
345    */
assertTaskSucceededState()346   private void assertTaskSucceededState() {
347     assertEquals(TaskState.SUCCEEDED, mockTask.getState());
348   }
349 
350   /**
351    * {@link Avataar}
352    */
assertTaskAttemptAvataar(Avataar avataar)353   private void assertTaskAttemptAvataar(Avataar avataar) {
354     for (TaskAttempt taskAttempt : mockTask.getAttempts().values()) {
355       if (((TaskAttemptImpl) taskAttempt).getAvataar() == avataar) {
356         return;
357       }
358     }
359     fail("There is no " + (avataar == Avataar.VIRGIN ? "virgin" : "speculative")
360         + "task attempt");
361   }
362 
363   @Test
testInit()364   public void testInit() {
365     LOG.info("--- START: testInit ---");
366     mockTask = createMockTask(TaskType.MAP);
367     assertTaskNewState();
368     assert(taskAttempts.size() == 0);
369   }
370 
371   @Test
372   /**
373    * {@link TaskState#NEW}->{@link TaskState#SCHEDULED}
374    */
testScheduleTask()375   public void testScheduleTask() {
376     LOG.info("--- START: testScheduleTask ---");
377     mockTask = createMockTask(TaskType.MAP);
378     TaskId taskId = getNewTaskID();
379     scheduleTaskAttempt(taskId);
380   }
381 
382   @Test
383   /**
384    * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT}
385    */
testKillScheduledTask()386   public void testKillScheduledTask() {
387     LOG.info("--- START: testKillScheduledTask ---");
388     mockTask = createMockTask(TaskType.MAP);
389     TaskId taskId = getNewTaskID();
390     scheduleTaskAttempt(taskId);
391     killTask(taskId);
392   }
393 
394   @Test
395   /**
396    * Kill attempt
397    * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED}
398    */
testKillScheduledTaskAttempt()399   public void testKillScheduledTaskAttempt() {
400     LOG.info("--- START: testKillScheduledTaskAttempt ---");
401     mockTask = createMockTask(TaskType.MAP);
402     TaskId taskId = getNewTaskID();
403     scheduleTaskAttempt(taskId);
404     killScheduledTaskAttempt(getLastAttempt().getAttemptId());
405   }
406 
407   @Test
408   /**
409    * Launch attempt
410    * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING}
411    */
testLaunchTaskAttempt()412   public void testLaunchTaskAttempt() {
413     LOG.info("--- START: testLaunchTaskAttempt ---");
414     mockTask = createMockTask(TaskType.MAP);
415     TaskId taskId = getNewTaskID();
416     scheduleTaskAttempt(taskId);
417     launchTaskAttempt(getLastAttempt().getAttemptId());
418   }
419 
420   @Test
421   /**
422    * Kill running attempt
423    * {@link TaskState#RUNNING}->{@link TaskState#RUNNING}
424    */
testKillRunningTaskAttempt()425   public void testKillRunningTaskAttempt() {
426     LOG.info("--- START: testKillRunningTaskAttempt ---");
427     mockTask = createMockTask(TaskType.MAP);
428     TaskId taskId = getNewTaskID();
429     scheduleTaskAttempt(taskId);
430     launchTaskAttempt(getLastAttempt().getAttemptId());
431     killRunningTaskAttempt(getLastAttempt().getAttemptId());
432   }
433 
434   @Test
testKillSuccessfulTask()435   public void testKillSuccessfulTask() {
436     LOG.info("--- START: testKillSuccesfulTask ---");
437     mockTask = createMockTask(TaskType.MAP);
438     TaskId taskId = getNewTaskID();
439     scheduleTaskAttempt(taskId);
440     launchTaskAttempt(getLastAttempt().getAttemptId());
441     commitTaskAttempt(getLastAttempt().getAttemptId());
442     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
443         TaskEventType.T_ATTEMPT_SUCCEEDED));
444     assertTaskSucceededState();
445     mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
446     assertTaskSucceededState();
447   }
448 
449   @Test
testTaskProgress()450   public void testTaskProgress() {
451     LOG.info("--- START: testTaskProgress ---");
452     mockTask = createMockTask(TaskType.MAP);
453 
454     // launch task
455     TaskId taskId = getNewTaskID();
456     scheduleTaskAttempt(taskId);
457     float progress = 0f;
458     assert(mockTask.getProgress() == progress);
459     launchTaskAttempt(getLastAttempt().getAttemptId());
460 
461     // update attempt1
462     progress = 50f;
463     updateLastAttemptProgress(progress);
464     assert(mockTask.getProgress() == progress);
465     progress = 100f;
466     updateLastAttemptProgress(progress);
467     assert(mockTask.getProgress() == progress);
468 
469     progress = 0f;
470     // mark first attempt as killed
471     updateLastAttemptState(TaskAttemptState.KILLED);
472     assert(mockTask.getProgress() == progress);
473 
474     // kill first attempt
475     // should trigger a new attempt
476     // as no successful attempts
477     killRunningTaskAttempt(getLastAttempt().getAttemptId());
478     assert(taskAttempts.size() == 2);
479 
480     assert(mockTask.getProgress() == 0f);
481     launchTaskAttempt(getLastAttempt().getAttemptId());
482     progress = 50f;
483     updateLastAttemptProgress(progress);
484     assert(mockTask.getProgress() == progress);
485 
486   }
487 
488 
489   @Test
testKillDuringTaskAttemptCommit()490   public void testKillDuringTaskAttemptCommit() {
491     mockTask = createMockTask(TaskType.REDUCE);
492     TaskId taskId = getNewTaskID();
493     scheduleTaskAttempt(taskId);
494 
495     launchTaskAttempt(getLastAttempt().getAttemptId());
496     updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
497     commitTaskAttempt(getLastAttempt().getAttemptId());
498 
499     TaskAttemptId commitAttempt = getLastAttempt().getAttemptId();
500     updateLastAttemptState(TaskAttemptState.KILLED);
501     killRunningTaskAttempt(commitAttempt);
502 
503     assertFalse(mockTask.canCommit(commitAttempt));
504   }
505 
506   @Test
testFailureDuringTaskAttemptCommit()507   public void testFailureDuringTaskAttemptCommit() {
508     mockTask = createMockTask(TaskType.MAP);
509     TaskId taskId = getNewTaskID();
510     scheduleTaskAttempt(taskId);
511     launchTaskAttempt(getLastAttempt().getAttemptId());
512     updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
513     commitTaskAttempt(getLastAttempt().getAttemptId());
514 
515     // During the task attempt commit there is an exception which causes
516     // the attempt to fail
517     updateLastAttemptState(TaskAttemptState.FAILED);
518     failRunningTaskAttempt(getLastAttempt().getAttemptId());
519 
520     assertEquals(2, taskAttempts.size());
521     updateLastAttemptState(TaskAttemptState.SUCCEEDED);
522     commitTaskAttempt(getLastAttempt().getAttemptId());
523     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
524         TaskEventType.T_ATTEMPT_SUCCEEDED));
525 
526     assertFalse("First attempt should not commit",
527         mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
528     assertTrue("Second attempt should commit",
529         mockTask.canCommit(getLastAttempt().getAttemptId()));
530 
531     assertTaskSucceededState();
532   }
533 
runSpeculativeTaskAttemptSucceeds( TaskEventType firstAttemptFinishEvent)534   private void runSpeculativeTaskAttemptSucceeds(
535       TaskEventType firstAttemptFinishEvent) {
536     TaskId taskId = getNewTaskID();
537     scheduleTaskAttempt(taskId);
538     launchTaskAttempt(getLastAttempt().getAttemptId());
539     updateLastAttemptState(TaskAttemptState.RUNNING);
540 
541     // Add a speculative task attempt that succeeds
542     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
543         TaskEventType.T_ADD_SPEC_ATTEMPT));
544     launchTaskAttempt(getLastAttempt().getAttemptId());
545     commitTaskAttempt(getLastAttempt().getAttemptId());
546     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
547         TaskEventType.T_ATTEMPT_SUCCEEDED));
548 
549     // The task should now have succeeded
550     assertTaskSucceededState();
551 
552     // Now complete the first task attempt, after the second has succeeded
553     mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
554         firstAttemptFinishEvent));
555 
556     // The task should still be in the succeeded state
557     assertTaskSucceededState();
558 
559     // The task should contain speculative a task attempt
560     assertTaskAttemptAvataar(Avataar.SPECULATIVE);
561   }
562 
563   @Test
testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails()564   public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
565     mockTask = createMockTask(TaskType.MAP);
566     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
567   }
568 
569   @Test
testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails()570   public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
571     mockTask = createMockTask(TaskType.REDUCE);
572     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
573   }
574 
575   @Test
testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled()576   public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
577     mockTask = createMockTask(TaskType.MAP);
578     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
579   }
580 
581   @Test
testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled()582   public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
583     mockTask = createMockTask(TaskType.REDUCE);
584     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
585   }
586 
587   @Test
testMultipleTaskAttemptsSucceed()588   public void testMultipleTaskAttemptsSucceed() {
589     mockTask = createMockTask(TaskType.MAP);
590     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
591   }
592 
593   @Test
testCommitAfterSucceeds()594   public void testCommitAfterSucceeds() {
595     mockTask = createMockTask(TaskType.REDUCE);
596     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
597   }
598 
599   @Test
testSpeculativeMapFetchFailure()600   public void testSpeculativeMapFetchFailure() {
601     // Setup a scenario where speculative task wins, first attempt killed
602     mockTask = createMockTask(TaskType.MAP);
603     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
604     assertEquals(2, taskAttempts.size());
605 
606     // speculative attempt retroactively fails from fetch failures
607     mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
608         TaskEventType.T_ATTEMPT_FAILED));
609 
610     assertTaskScheduledState();
611     assertEquals(3, taskAttempts.size());
612   }
613 
614   @Test
testSpeculativeMapMultipleSucceedFetchFailure()615   public void testSpeculativeMapMultipleSucceedFetchFailure() {
616     // Setup a scenario where speculative task wins, first attempt succeeds
617     mockTask = createMockTask(TaskType.MAP);
618     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
619     assertEquals(2, taskAttempts.size());
620 
621     // speculative attempt retroactively fails from fetch failures
622     mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
623         TaskEventType.T_ATTEMPT_FAILED));
624 
625     assertTaskScheduledState();
626     assertEquals(3, taskAttempts.size());
627   }
628 
629   @Test
testSpeculativeMapFailedFetchFailure()630   public void testSpeculativeMapFailedFetchFailure() {
631     // Setup a scenario where speculative task wins, first attempt succeeds
632     mockTask = createMockTask(TaskType.MAP);
633     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
634     assertEquals(2, taskAttempts.size());
635 
636     // speculative attempt retroactively fails from fetch failures
637     mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
638         TaskEventType.T_ATTEMPT_FAILED));
639 
640     assertTaskScheduledState();
641     assertEquals(3, taskAttempts.size());
642   }
643 
644   @Test
testFailedTransitions()645   public void testFailedTransitions() {
646     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
647         remoteJobConfFile, conf, taskAttemptListener, jobToken,
648         credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
649           @Override
650           protected int getMaxAttempts() {
651             return 1;
652           }
653     };
654     TaskId taskId = getNewTaskID();
655     scheduleTaskAttempt(taskId);
656     launchTaskAttempt(getLastAttempt().getAttemptId());
657 
658     // add three more speculative attempts
659     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
660         TaskEventType.T_ADD_SPEC_ATTEMPT));
661     launchTaskAttempt(getLastAttempt().getAttemptId());
662     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
663         TaskEventType.T_ADD_SPEC_ATTEMPT));
664     launchTaskAttempt(getLastAttempt().getAttemptId());
665     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
666         TaskEventType.T_ADD_SPEC_ATTEMPT));
667     launchTaskAttempt(getLastAttempt().getAttemptId());
668     assertEquals(4, taskAttempts.size());
669 
670     // have the first attempt fail, verify task failed due to no retries
671     MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
672     taskAttempt.setState(TaskAttemptState.FAILED);
673     mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
674         TaskEventType.T_ATTEMPT_FAILED));
675     assertEquals(TaskState.FAILED, mockTask.getState());
676 
677     // verify task can no longer be killed
678     mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
679     assertEquals(TaskState.FAILED, mockTask.getState());
680 
681     // verify speculative doesn't launch new tasks
682     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
683         TaskEventType.T_ADD_SPEC_ATTEMPT));
684     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
685         TaskEventType.T_ATTEMPT_LAUNCHED));
686     assertEquals(TaskState.FAILED, mockTask.getState());
687     assertEquals(4, taskAttempts.size());
688 
689     // verify attempt events from active tasks don't knock task out of FAILED
690     taskAttempt = taskAttempts.get(1);
691     taskAttempt.setState(TaskAttemptState.COMMIT_PENDING);
692     mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
693         TaskEventType.T_ATTEMPT_COMMIT_PENDING));
694     assertEquals(TaskState.FAILED, mockTask.getState());
695     taskAttempt.setState(TaskAttemptState.FAILED);
696     mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
697         TaskEventType.T_ATTEMPT_FAILED));
698     assertEquals(TaskState.FAILED, mockTask.getState());
699     taskAttempt = taskAttempts.get(2);
700     taskAttempt.setState(TaskAttemptState.SUCCEEDED);
701     mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
702         TaskEventType.T_ATTEMPT_SUCCEEDED));
703     assertEquals(TaskState.FAILED, mockTask.getState());
704     taskAttempt = taskAttempts.get(3);
705     taskAttempt.setState(TaskAttemptState.KILLED);
706     mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
707         TaskEventType.T_ATTEMPT_KILLED));
708     assertEquals(TaskState.FAILED, mockTask.getState());
709   }
710 
711   @Test
testCountersWithSpeculation()712   public void testCountersWithSpeculation() {
713     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
714         remoteJobConfFile, conf, taskAttemptListener, jobToken,
715         credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
716           @Override
717           protected int getMaxAttempts() {
718             return 1;
719           }
720     };
721     TaskId taskId = getNewTaskID();
722     scheduleTaskAttempt(taskId);
723     launchTaskAttempt(getLastAttempt().getAttemptId());
724     updateLastAttemptState(TaskAttemptState.RUNNING);
725     MockTaskAttemptImpl baseAttempt = getLastAttempt();
726 
727     // add a speculative attempt
728     mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
729         TaskEventType.T_ADD_SPEC_ATTEMPT));
730     launchTaskAttempt(getLastAttempt().getAttemptId());
731     updateLastAttemptState(TaskAttemptState.RUNNING);
732     MockTaskAttemptImpl specAttempt = getLastAttempt();
733     assertEquals(2, taskAttempts.size());
734 
735     Counters specAttemptCounters = new Counters();
736     Counter cpuCounter = specAttemptCounters.findCounter(
737         TaskCounter.CPU_MILLISECONDS);
738     cpuCounter.setValue(1000);
739     specAttempt.setCounters(specAttemptCounters);
740 
741     // have the spec attempt succeed but second attempt at 1.0 progress as well
742     commitTaskAttempt(specAttempt.getAttemptId());
743     specAttempt.setProgress(1.0f);
744     specAttempt.setState(TaskAttemptState.SUCCEEDED);
745     mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
746         TaskEventType.T_ATTEMPT_SUCCEEDED));
747     assertEquals(TaskState.SUCCEEDED, mockTask.getState());
748     baseAttempt.setProgress(1.0f);
749 
750     Counters taskCounters = mockTask.getCounters();
751     assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
752   }
753 }
754