1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2.app;
20 
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.atomic.AtomicLong;
31 
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.mapred.TaskCompletionEvent;
37 import org.apache.hadoop.mapreduce.Counters;
38 import org.apache.hadoop.mapreduce.JobACL;
39 import org.apache.hadoop.mapreduce.MRJobConfig;
40 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
41 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
42 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
43 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
44 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
45 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
46 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
47 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
48 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
49 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
50 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
51 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
52 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
53 import org.apache.hadoop.mapreduce.v2.app.job.Job;
54 import org.apache.hadoop.mapreduce.v2.app.job.Task;
55 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
56 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
57 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
58 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
59 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
60 import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
61 import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
62 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
63 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
64 import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
65 import org.apache.hadoop.security.UserGroupInformation;
66 import org.apache.hadoop.security.authorize.AccessControlList;
67 import org.apache.hadoop.service.CompositeService;
68 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
69 import org.apache.hadoop.yarn.api.records.ApplicationId;
70 import org.apache.hadoop.yarn.api.records.ContainerId;
71 import org.apache.hadoop.yarn.api.records.NodeId;
72 import org.apache.hadoop.yarn.api.records.Token;
73 import org.apache.hadoop.yarn.event.AsyncDispatcher;
74 import org.apache.hadoop.yarn.event.EventHandler;
75 import org.apache.hadoop.yarn.factories.RecordFactory;
76 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
77 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
78 import org.apache.hadoop.yarn.util.Clock;
79 import org.apache.hadoop.yarn.util.SystemClock;
80 import org.junit.Assert;
81 import org.junit.Test;
82 
83 @SuppressWarnings({"unchecked", "rawtypes"})
84 public class TestRuntimeEstimators {
85 
86   private static int INITIAL_NUMBER_FREE_SLOTS = 600;
87   private static int MAP_SLOT_REQUIREMENT = 3;
88   // this has to be at least as much as map slot requirement
89   private static int REDUCE_SLOT_REQUIREMENT = 4;
90   private static int MAP_TASKS = 200;
91   private static int REDUCE_TASKS = 150;
92 
93   MockClock clock;
94 
95   Job myJob;
96 
97   AppContext myAppContext;
98 
99   private static final Log LOG = LogFactory.getLog(TestRuntimeEstimators.class);
100 
101   private final AtomicInteger slotsInUse = new AtomicInteger(0);
102 
103   AsyncDispatcher dispatcher;
104 
105   DefaultSpeculator speculator;
106 
107   TaskRuntimeEstimator estimator;
108 
109   // This is a huge kluge.  The real implementations have a decent approach
110   private final AtomicInteger completedMaps = new AtomicInteger(0);
111   private final AtomicInteger completedReduces = new AtomicInteger(0);
112 
113   private final AtomicInteger successfulSpeculations
114       = new AtomicInteger(0);
115   private final AtomicLong taskTimeSavedBySpeculation
116       = new AtomicLong(0L);
117 
118   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
119 
coreTestEstimator(TaskRuntimeEstimator testedEstimator, int expectedSpeculations)120   private void coreTestEstimator
121       (TaskRuntimeEstimator testedEstimator, int expectedSpeculations) {
122     estimator = testedEstimator;
123 	clock = new MockClock();
124 	dispatcher = new AsyncDispatcher();
125     myJob = null;
126     slotsInUse.set(0);
127     completedMaps.set(0);
128     completedReduces.set(0);
129     successfulSpeculations.set(0);
130     taskTimeSavedBySpeculation.set(0);
131 
132     clock.advanceTime(1000);
133 
134     Configuration conf = new Configuration();
135 
136     myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
137     myJob = myAppContext.getAllJobs().values().iterator().next();
138 
139     estimator.contextualize(conf, myAppContext);
140 
141     conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, 500L);
142     conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, 5000L);
143     conf.setDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, 0.1);
144     conf.setDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, 0.001);
145     conf.setInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, 5);
146     speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
147     Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_NO_SPECULATE value",
148         500L, speculator.getSoonestRetryAfterNoSpeculate());
149     Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_SPECULATE value",
150         5000L, speculator.getSoonestRetryAfterSpeculate());
151     Assert.assertEquals(speculator.getProportionRunningTasksSpeculatable(),
152         0.1, 0.00001);
153     Assert.assertEquals(speculator.getProportionTotalTasksSpeculatable(),
154         0.001, 0.00001);
155     Assert.assertEquals("wrong SPECULATIVE_MINIMUM_ALLOWED_TASKS value",
156         5, speculator.getMinimumAllowedSpeculativeTasks());
157 
158     dispatcher.register(Speculator.EventType.class, speculator);
159 
160     dispatcher.register(TaskEventType.class, new SpeculationRequestEventHandler());
161 
162     dispatcher.init(conf);
163     dispatcher.start();
164 
165 
166 
167     speculator.init(conf);
168     speculator.start();
169 
170     // Now that the plumbing is hooked up, we do the following:
171     //  do until all tasks are finished, ...
172     //  1: If we have spare capacity, assign as many map tasks as we can, then
173     //     assign as many reduce tasks as we can.  Note that an odd reduce
174     //     task might be started while there are still map tasks, because
175     //     map tasks take 3 slots and reduce tasks 2 slots.
176     //  2: Send a speculation event for every task attempt that's running
177     //  note that new attempts might get started by the speculator
178 
179     // discover undone tasks
180     int undoneMaps = MAP_TASKS;
181     int undoneReduces = REDUCE_TASKS;
182 
183     // build a task sequence where all the maps precede any of the reduces
184     List<Task> allTasksSequence = new LinkedList<Task>();
185 
186     allTasksSequence.addAll(myJob.getTasks(TaskType.MAP).values());
187     allTasksSequence.addAll(myJob.getTasks(TaskType.REDUCE).values());
188 
189     while (undoneMaps + undoneReduces > 0) {
190       undoneMaps = 0; undoneReduces = 0;
191       // start all attempts which are new but for which there is enough slots
192       for (Task task : allTasksSequence) {
193         if (!task.isFinished()) {
194           if (task.getType() == TaskType.MAP) {
195             ++undoneMaps;
196           } else {
197             ++undoneReduces;
198           }
199         }
200         for (TaskAttempt attempt : task.getAttempts().values()) {
201           if (attempt.getState() == TaskAttemptState.NEW
202               && INITIAL_NUMBER_FREE_SLOTS - slotsInUse.get()
203                     >= taskTypeSlots(task.getType())) {
204             MyTaskAttemptImpl attemptImpl = (MyTaskAttemptImpl)attempt;
205             SpeculatorEvent event
206                 = new SpeculatorEvent(attempt.getID(), false, clock.getTime());
207             speculator.handle(event);
208             attemptImpl.startUp();
209           } else {
210             // If a task attempt is in progress we should send the news to
211             // the Speculator.
212             TaskAttemptStatus status = new TaskAttemptStatus();
213             status.id = attempt.getID();
214             status.progress = attempt.getProgress();
215             status.stateString = attempt.getState().name();
216             status.taskState = attempt.getState();
217             SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime());
218             speculator.handle(event);
219           }
220         }
221       }
222 
223       long startTime = System.currentTimeMillis();
224 
225       // drain the speculator event queue
226       while (!speculator.eventQueueEmpty()) {
227         Thread.yield();
228         if (System.currentTimeMillis() > startTime + 130000) {
229           return;
230         }
231       }
232 
233       clock.advanceTime(1000L);
234 
235       if (clock.getTime() % 10000L == 0L) {
236         speculator.scanForSpeculations();
237       }
238     }
239 
240     Assert.assertEquals("We got the wrong number of successful speculations.",
241         expectedSpeculations, successfulSpeculations.get());
242   }
243 
244   @Test
testLegacyEstimator()245   public void testLegacyEstimator() throws Exception {
246     TaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator();
247     coreTestEstimator(specificEstimator, 3);
248   }
249 
250   @Test
testExponentialEstimator()251   public void testExponentialEstimator() throws Exception {
252     TaskRuntimeEstimator specificEstimator
253         = new ExponentiallySmoothedTaskRuntimeEstimator();
254     coreTestEstimator(specificEstimator, 3);
255   }
256 
taskTypeSlots(TaskType type)257   int taskTypeSlots(TaskType type) {
258     return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
259   }
260 
261   class SpeculationRequestEventHandler implements EventHandler<TaskEvent> {
262 
263     @Override
handle(TaskEvent event)264     public void handle(TaskEvent event) {
265       TaskId taskID = event.getTaskID();
266       Task task = myJob.getTask(taskID);
267 
268       Assert.assertEquals
269           ("Wrong type event", TaskEventType.T_ADD_SPEC_ATTEMPT, event.getType());
270 
271       System.out.println("SpeculationRequestEventHandler.handle adds a speculation task for " + taskID);
272 
273       addAttempt(task);
274     }
275   }
276 
addAttempt(Task task)277   void addAttempt(Task task) {
278     MyTaskImpl myTask = (MyTaskImpl) task;
279 
280     myTask.addAttempt();
281   }
282 
283   class MyTaskImpl implements Task {
284     private final TaskId taskID;
285     private final Map<TaskAttemptId, TaskAttempt> attempts
286         = new ConcurrentHashMap<TaskAttemptId, TaskAttempt>(4);
287 
MyTaskImpl(JobId jobID, int index, TaskType type)288     MyTaskImpl(JobId jobID, int index, TaskType type) {
289       taskID = recordFactory.newRecordInstance(TaskId.class);
290       taskID.setId(index);
291       taskID.setTaskType(type);
292       taskID.setJobId(jobID);
293     }
294 
addAttempt()295     void addAttempt() {
296       TaskAttempt taskAttempt
297           = new MyTaskAttemptImpl(taskID, attempts.size(), clock);
298       TaskAttemptId taskAttemptID = taskAttempt.getID();
299 
300       attempts.put(taskAttemptID, taskAttempt);
301 
302       System.out.println("TLTRE.MyTaskImpl.addAttempt " + getID());
303 
304       SpeculatorEvent event = new SpeculatorEvent(taskID, +1);
305       dispatcher.getEventHandler().handle(event);
306     }
307 
308     @Override
getID()309     public TaskId getID() {
310       return taskID;
311     }
312 
313     @Override
getReport()314     public TaskReport getReport() {
315       throw new UnsupportedOperationException("Not supported yet.");
316     }
317 
318     @Override
getCounters()319     public Counters getCounters() {
320       throw new UnsupportedOperationException("Not supported yet.");
321     }
322 
323     @Override
getProgress()324     public float getProgress() {
325       float result = 0.0F;
326 
327 
328      for (TaskAttempt attempt : attempts.values()) {
329        result = Math.max(result, attempt.getProgress());
330      }
331 
332      return result;
333     }
334 
335     @Override
getType()336     public TaskType getType() {
337       return taskID.getTaskType();
338     }
339 
340     @Override
getAttempts()341     public Map<TaskAttemptId, TaskAttempt> getAttempts() {
342       Map<TaskAttemptId, TaskAttempt> result
343           = new HashMap<TaskAttemptId, TaskAttempt>(attempts.size());
344       result.putAll(attempts);
345       return result;
346     }
347 
348     @Override
getAttempt(TaskAttemptId attemptID)349     public TaskAttempt getAttempt(TaskAttemptId attemptID) {
350       return attempts.get(attemptID);
351     }
352 
353     @Override
isFinished()354     public boolean isFinished() {
355       for (TaskAttempt attempt : attempts.values()) {
356         if (attempt.getState() == TaskAttemptState.SUCCEEDED) {
357           return true;
358         }
359       }
360 
361       return false;
362     }
363 
364     @Override
canCommit(TaskAttemptId taskAttemptID)365     public boolean canCommit(TaskAttemptId taskAttemptID) {
366       throw new UnsupportedOperationException("Not supported yet.");
367     }
368 
369     @Override
getState()370     public TaskState getState() {
371       throw new UnsupportedOperationException("Not supported yet.");
372     }
373 
374   }
375 
376   class MyJobImpl implements Job {
377     private final JobId jobID;
378     private final Map<TaskId, Task> allTasks = new HashMap<TaskId, Task>();
379     private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
380     private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
381 
MyJobImpl(JobId jobID, int numMaps, int numReduces)382     MyJobImpl(JobId jobID, int numMaps, int numReduces) {
383       this.jobID = jobID;
384       for (int i = 0; i < numMaps; ++i) {
385         Task newTask = new MyTaskImpl(jobID, i, TaskType.MAP);
386         mapTasks.put(newTask.getID(), newTask);
387         allTasks.put(newTask.getID(), newTask);
388       }
389       for (int i = 0; i < numReduces; ++i) {
390         Task newTask = new MyTaskImpl(jobID, i, TaskType.REDUCE);
391         reduceTasks.put(newTask.getID(), newTask);
392         allTasks.put(newTask.getID(), newTask);
393       }
394 
395       // give every task an attempt
396       for (Task task : allTasks.values()) {
397         MyTaskImpl myTaskImpl = (MyTaskImpl) task;
398         myTaskImpl.addAttempt();
399       }
400     }
401 
402     @Override
getID()403     public JobId getID() {
404       return jobID;
405     }
406 
407     @Override
getState()408     public JobState getState() {
409       throw new UnsupportedOperationException("Not supported yet.");
410     }
411 
412     @Override
getReport()413     public JobReport getReport() {
414       throw new UnsupportedOperationException("Not supported yet.");
415     }
416 
417     @Override
getProgress()418     public float getProgress() {
419       return 0;
420     }
421 
422     @Override
getAllCounters()423     public Counters getAllCounters() {
424       throw new UnsupportedOperationException("Not supported yet.");
425     }
426 
427     @Override
getTasks()428     public Map<TaskId, Task> getTasks() {
429       return allTasks;
430     }
431 
432     @Override
getTasks(TaskType taskType)433     public Map<TaskId, Task> getTasks(TaskType taskType) {
434       return taskType == TaskType.MAP ? mapTasks : reduceTasks;
435     }
436 
437     @Override
getTask(TaskId taskID)438     public Task getTask(TaskId taskID) {
439       return allTasks.get(taskID);
440     }
441 
442     @Override
getDiagnostics()443     public List<String> getDiagnostics() {
444       throw new UnsupportedOperationException("Not supported yet.");
445     }
446 
447     @Override
getCompletedMaps()448     public int getCompletedMaps() {
449       return completedMaps.get();
450     }
451 
452     @Override
getCompletedReduces()453     public int getCompletedReduces() {
454       return completedReduces.get();
455     }
456 
457     @Override
458     public TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents)459             getTaskAttemptCompletionEvents(int fromEventId, int maxEvents) {
460       throw new UnsupportedOperationException("Not supported yet.");
461     }
462 
463     @Override
464     public TaskCompletionEvent[]
getMapAttemptCompletionEvents(int startIndex, int maxEvents)465             getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
466       throw new UnsupportedOperationException("Not supported yet.");
467     }
468 
469     @Override
getName()470     public String getName() {
471       throw new UnsupportedOperationException("Not supported yet.");
472     }
473 
474     @Override
getQueueName()475     public String getQueueName() {
476       throw new UnsupportedOperationException("Not supported yet.");
477     }
478 
479     @Override
getTotalMaps()480     public int getTotalMaps() {
481       return mapTasks.size();
482     }
483 
484     @Override
getTotalReduces()485     public int getTotalReduces() {
486       return reduceTasks.size();
487     }
488 
489     @Override
isUber()490     public boolean isUber() {
491       return false;
492     }
493 
494     @Override
checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)495     public boolean checkAccess(UserGroupInformation callerUGI,
496         JobACL jobOperation) {
497       return true;
498     }
499 
500     @Override
getUserName()501     public String getUserName() {
502       throw new UnsupportedOperationException("Not supported yet.");
503     }
504 
505     @Override
getConfFile()506     public Path getConfFile() {
507       throw new UnsupportedOperationException("Not supported yet.");
508     }
509 
510     @Override
getJobACLs()511     public Map<JobACL, AccessControlList> getJobACLs() {
512       throw new UnsupportedOperationException("Not supported yet.");
513     }
514 
515     @Override
getAMInfos()516     public List<AMInfo> getAMInfos() {
517       throw new UnsupportedOperationException("Not supported yet.");
518     }
519 
520     @Override
loadConfFile()521     public Configuration loadConfFile() {
522       throw new UnsupportedOperationException();
523     }
524 
525     @Override
setQueueName(String queueName)526     public void setQueueName(String queueName) {
527       // do nothing
528     }
529   }
530 
531   /*
532    * We follow the pattern of the real XxxImpl .  We create a job and initialize
533    * it with a full suite of tasks which in turn have one attempt each in the
534    * NEW state.  Attempts transition only from NEW to RUNNING to SUCCEEDED .
535    */
536   class MyTaskAttemptImpl implements TaskAttempt {
537     private final TaskAttemptId myAttemptID;
538 
539     long startMockTime = Long.MIN_VALUE;
540 
541     long shuffleCompletedTime = Long.MAX_VALUE;
542 
543     TaskAttemptState overridingState = TaskAttemptState.NEW;
544 
MyTaskAttemptImpl(TaskId taskID, int index, Clock clock)545     MyTaskAttemptImpl(TaskId taskID, int index, Clock clock) {
546       myAttemptID = recordFactory.newRecordInstance(TaskAttemptId.class);
547       myAttemptID.setId(index);
548       myAttemptID.setTaskId(taskID);
549     }
550 
startUp()551     void startUp() {
552       startMockTime = clock.getTime();
553       overridingState = null;
554 
555       slotsInUse.addAndGet(taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
556 
557       System.out.println("TLTRE.MyTaskAttemptImpl.startUp starting " + getID());
558 
559       SpeculatorEvent event = new SpeculatorEvent(getID().getTaskId(), -1);
560       dispatcher.getEventHandler().handle(event);
561     }
562 
563     @Override
getNodeId()564     public NodeId getNodeId() throws UnsupportedOperationException{
565       throw new UnsupportedOperationException();
566     }
567 
568     @Override
getID()569     public TaskAttemptId getID() {
570       return myAttemptID;
571     }
572 
573     @Override
getReport()574     public TaskAttemptReport getReport() {
575       throw new UnsupportedOperationException("Not supported yet.");
576     }
577 
578     @Override
getDiagnostics()579     public List<String> getDiagnostics() {
580       throw new UnsupportedOperationException("Not supported yet.");
581     }
582 
583     @Override
getCounters()584     public Counters getCounters() {
585       throw new UnsupportedOperationException("Not supported yet.");
586     }
587 
588     @Override
getShufflePort()589     public int getShufflePort() {
590       throw new UnsupportedOperationException("Not supported yet.");
591     }
592 
getCodeRuntime()593     private float getCodeRuntime() {
594       int taskIndex = myAttemptID.getTaskId().getId();
595       int attemptIndex = myAttemptID.getId();
596 
597       float result = 200.0F;
598 
599       switch (taskIndex % 4) {
600         case 0:
601           if (taskIndex % 40 == 0 && attemptIndex == 0) {
602             result = 600.0F;
603             break;
604           }
605 
606           break;
607         case 2:
608           break;
609 
610         case 1:
611           result = 150.0F;
612           break;
613 
614         case 3:
615           result = 250.0F;
616           break;
617       }
618 
619       return result;
620     }
621 
getMapProgress()622     private float getMapProgress() {
623       float runtime = getCodeRuntime();
624 
625       return Math.min
626           ((float) (clock.getTime() - startMockTime) / (runtime * 1000.0F), 1.0F);
627     }
628 
getReduceProgress()629     private float getReduceProgress() {
630       Job job = myAppContext.getJob(myAttemptID.getTaskId().getJobId());
631       float runtime = getCodeRuntime();
632 
633       Collection<Task> allMapTasks = job.getTasks(TaskType.MAP).values();
634 
635       int numberMaps = allMapTasks.size();
636       int numberDoneMaps = 0;
637 
638       for (Task mapTask : allMapTasks) {
639         if (mapTask.isFinished()) {
640           ++numberDoneMaps;
641         }
642       }
643 
644       if (numberMaps == numberDoneMaps) {
645         shuffleCompletedTime = Math.min(shuffleCompletedTime, clock.getTime());
646 
647         return Math.min
648             ((float) (clock.getTime() - shuffleCompletedTime)
649                         / (runtime * 2000.0F) + 0.5F,
650              1.0F);
651       } else {
652         return ((float) numberDoneMaps) / numberMaps * 0.5F;
653       }
654     }
655 
656     // we compute progress from time and an algorithm now
657     @Override
getProgress()658     public float getProgress() {
659       if (overridingState == TaskAttemptState.NEW) {
660         return 0.0F;
661       }
662       return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress();
663     }
664 
665     @Override
getPhase()666     public Phase getPhase() {
667       throw new UnsupportedOperationException("Not supported yet.");
668     }
669 
670     @Override
getState()671     public TaskAttemptState getState() {
672       if (overridingState != null) {
673         return overridingState;
674       }
675       TaskAttemptState result
676           = getProgress() < 1.0F ? TaskAttemptState.RUNNING : TaskAttemptState.SUCCEEDED;
677 
678       if (result == TaskAttemptState.SUCCEEDED) {
679         overridingState = TaskAttemptState.SUCCEEDED;
680 
681         System.out.println("MyTaskAttemptImpl.getState() -- attempt " + myAttemptID + " finished.");
682 
683         slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
684 
685         (myAttemptID.getTaskId().getTaskType() == TaskType.MAP
686             ? completedMaps : completedReduces).getAndIncrement();
687 
688         // check for a spectacularly successful speculation
689         TaskId taskID = myAttemptID.getTaskId();
690 
691         Task task = myJob.getTask(taskID);
692 
693         for (TaskAttempt otherAttempt : task.getAttempts().values()) {
694           if (otherAttempt != this
695               && otherAttempt.getState() == TaskAttemptState.RUNNING) {
696             // we had two instances running.  Try to determine how much
697             //  we might have saved by speculation
698             if (getID().getId() > otherAttempt.getID().getId()) {
699               // the speculation won
700               successfulSpeculations.getAndIncrement();
701               float hisProgress = otherAttempt.getProgress();
702               long hisStartTime = ((MyTaskAttemptImpl)otherAttempt).startMockTime;
703               System.out.println("TLTRE:A speculation finished at time "
704                   + clock.getTime()
705                   + ".  The stalled attempt is at " + (hisProgress * 100.0)
706                   + "% progress, and it started at "
707                   + hisStartTime + ", which is "
708                   + (clock.getTime() - hisStartTime) + " ago.");
709               long originalTaskEndEstimate
710                   = (hisStartTime
711                       + estimator.estimatedRuntime(otherAttempt.getID()));
712               System.out.println(
713                   "TLTRE: We would have expected the original attempt to take "
714                   + estimator.estimatedRuntime(otherAttempt.getID())
715                   + ", finishing at " + originalTaskEndEstimate);
716               long estimatedSavings = originalTaskEndEstimate - clock.getTime();
717               taskTimeSavedBySpeculation.addAndGet(estimatedSavings);
718               System.out.println("TLTRE: The task is " + task.getID());
719               slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
720               ((MyTaskAttemptImpl)otherAttempt).overridingState
721                   = TaskAttemptState.KILLED;
722             } else {
723               System.out.println(
724                   "TLTRE: The normal attempt beat the speculation in "
725                   + task.getID());
726             }
727           }
728         }
729       }
730 
731       return result;
732     }
733 
734     @Override
isFinished()735     public boolean isFinished() {
736       return getProgress() == 1.0F;
737     }
738 
739     @Override
getAssignedContainerID()740     public ContainerId getAssignedContainerID() {
741       throw new UnsupportedOperationException("Not supported yet.");
742     }
743 
744     @Override
getNodeHttpAddress()745     public String getNodeHttpAddress() {
746       throw new UnsupportedOperationException("Not supported yet.");
747     }
748 
749     @Override
getNodeRackName()750     public String getNodeRackName() {
751       throw new UnsupportedOperationException("Not supported yet.");
752     }
753 
754     @Override
getLaunchTime()755     public long getLaunchTime() {
756       return startMockTime;
757     }
758 
759     @Override
getFinishTime()760     public long getFinishTime() {
761       throw new UnsupportedOperationException("Not supported yet.");
762     }
763 
764     @Override
getShuffleFinishTime()765     public long getShuffleFinishTime() {
766       throw new UnsupportedOperationException("Not supported yet.");
767     }
768 
769     @Override
getSortFinishTime()770     public long getSortFinishTime() {
771       throw new UnsupportedOperationException("Not supported yet.");
772     }
773 
774     @Override
getAssignedContainerMgrAddress()775     public String getAssignedContainerMgrAddress() {
776       throw new UnsupportedOperationException("Not supported yet.");
777     }
778   }
779 
780   static class MockClock implements Clock {
781     private long currentTime = 0;
782 
getTime()783     public long getTime() {
784       return currentTime;
785     }
786 
setMeasuredTime(long newTime)787     void setMeasuredTime(long newTime) {
788       currentTime = newTime;
789     }
790 
advanceTime(long increment)791     void advanceTime(long increment) {
792       currentTime += increment;
793     }
794   }
795 
796   class MyAppMaster extends CompositeService {
797     final Clock clock;
MyAppMaster(Clock clock)798       public MyAppMaster(Clock clock) {
799         super(MyAppMaster.class.getName());
800         if (clock == null) {
801           clock = new SystemClock();
802         }
803       this.clock = clock;
804       LOG.info("Created MyAppMaster");
805     }
806   }
807 
808   class MyAppContext implements AppContext {
809     private final ApplicationAttemptId myAppAttemptID;
810     private final ApplicationId myApplicationID;
811     private final JobId myJobID;
812     private final Map<JobId, Job> allJobs;
813 
MyAppContext(int numberMaps, int numberReduces)814     MyAppContext(int numberMaps, int numberReduces) {
815       myApplicationID = ApplicationId.newInstance(clock.getTime(), 1);
816 
817       myAppAttemptID = ApplicationAttemptId.newInstance(myApplicationID, 0);
818       myJobID = recordFactory.newRecordInstance(JobId.class);
819       myJobID.setAppId(myApplicationID);
820 
821       Job myJob
822           = new MyJobImpl(myJobID, numberMaps, numberReduces);
823 
824       allJobs = Collections.singletonMap(myJobID, myJob);
825     }
826 
827     @Override
getApplicationAttemptId()828     public ApplicationAttemptId getApplicationAttemptId() {
829       return myAppAttemptID;
830     }
831 
832     @Override
getApplicationID()833     public ApplicationId getApplicationID() {
834       return myApplicationID;
835     }
836 
837     @Override
getJob(JobId jobID)838     public Job getJob(JobId jobID) {
839       return allJobs.get(jobID);
840     }
841 
842     @Override
getAllJobs()843     public Map<JobId, Job> getAllJobs() {
844       return allJobs;
845     }
846 
847     @Override
getEventHandler()848     public EventHandler getEventHandler() {
849       return dispatcher.getEventHandler();
850     }
851 
852     @Override
getUser()853     public CharSequence getUser() {
854       throw new UnsupportedOperationException("Not supported yet.");
855     }
856 
857     @Override
getClock()858     public Clock getClock() {
859       return clock;
860     }
861 
862     @Override
getApplicationName()863     public String getApplicationName() {
864       return null;
865     }
866 
867     @Override
getStartTime()868     public long getStartTime() {
869       return 0;
870     }
871 
872     @Override
getClusterInfo()873     public ClusterInfo getClusterInfo() {
874       return new ClusterInfo();
875     }
876 
877     @Override
getBlacklistedNodes()878     public Set<String> getBlacklistedNodes() {
879       return null;
880     }
881 
882     @Override
getClientToAMTokenSecretManager()883     public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
884       return null;
885     }
886 
887     @Override
isLastAMRetry()888     public boolean isLastAMRetry() {
889       return false;
890     }
891 
892     @Override
hasSuccessfullyUnregistered()893     public boolean hasSuccessfullyUnregistered() {
894       // bogus - Not Required
895       return true;
896     }
897 
898     @Override
getNMHostname()899     public String getNMHostname() {
900       // bogus - Not Required
901       return null;
902     }
903   }
904 }
905