1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2.app.speculate;
20 
21 import java.lang.reflect.Constructor;
22 import java.lang.reflect.InvocationTargetException;
23 import java.util.HashSet;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32 
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.mapreduce.MRJobConfig;
37 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
38 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
39 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
40 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
41 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
42 import org.apache.hadoop.mapreduce.v2.app.AppContext;
43 import org.apache.hadoop.mapreduce.v2.app.job.Job;
44 import org.apache.hadoop.mapreduce.v2.app.job.Task;
45 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
46 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
47 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
48 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
49 import org.apache.hadoop.service.AbstractService;
50 import org.apache.hadoop.yarn.event.EventHandler;
51 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
52 import org.apache.hadoop.yarn.util.Clock;
53 
54 import com.google.common.annotations.VisibleForTesting;
55 
56 public class DefaultSpeculator extends AbstractService implements
57     Speculator {
58 
59   private static final long ON_SCHEDULE = Long.MIN_VALUE;
60   private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
61   private static final long TOO_NEW = Long.MIN_VALUE + 2;
62   private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
63   private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
64   private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
65 
66   private long soonestRetryAfterNoSpeculate;
67   private long soonestRetryAfterSpeculate;
68   private double proportionRunningTasksSpeculatable;
69   private double proportionTotalTasksSpeculatable;
70   private int  minimumAllowedSpeculativeTasks;
71 
72   private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
73 
74   private final ConcurrentMap<TaskId, Boolean> runningTasks
75       = new ConcurrentHashMap<TaskId, Boolean>();
76 
77   // Used to track any TaskAttempts that aren't heart-beating for a while, so
78   // that we can aggressively speculate instead of waiting for task-timeout.
79   private final ConcurrentMap<TaskAttemptId, TaskAttemptHistoryStatistics>
80       runningTaskAttemptStatistics = new ConcurrentHashMap<TaskAttemptId,
81           TaskAttemptHistoryStatistics>();
82   // Regular heartbeat from tasks is every 3 secs. So if we don't get a
83   // heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change
84   // in progress.
85   private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
86 
87   // These are the current needs, not the initial needs.  For each job, these
88   //  record the number of attempts that exist and that are actively
89   //  waiting for a container [as opposed to running or finished]
90   private final ConcurrentMap<JobId, AtomicInteger> mapContainerNeeds
91       = new ConcurrentHashMap<JobId, AtomicInteger>();
92   private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds
93       = new ConcurrentHashMap<JobId, AtomicInteger>();
94 
95   private final Set<TaskId> mayHaveSpeculated = new HashSet<TaskId>();
96 
97   private final Configuration conf;
98   private AppContext context;
99   private Thread speculationBackgroundThread = null;
100   private volatile boolean stopped = false;
101   private BlockingQueue<SpeculatorEvent> eventQueue
102       = new LinkedBlockingQueue<SpeculatorEvent>();
103   private TaskRuntimeEstimator estimator;
104 
105   private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>();
106 
107   private final Clock clock;
108 
109   private final EventHandler<TaskEvent> eventHandler;
110 
DefaultSpeculator(Configuration conf, AppContext context)111   public DefaultSpeculator(Configuration conf, AppContext context) {
112     this(conf, context, context.getClock());
113   }
114 
DefaultSpeculator(Configuration conf, AppContext context, Clock clock)115   public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
116     this(conf, context, getEstimator(conf, context), clock);
117   }
118 
getEstimator(Configuration conf, AppContext context)119   static private TaskRuntimeEstimator getEstimator
120       (Configuration conf, AppContext context) {
121     TaskRuntimeEstimator estimator;
122 
123     try {
124       // "yarn.mapreduce.job.task.runtime.estimator.class"
125       Class<? extends TaskRuntimeEstimator> estimatorClass
126           = conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
127                           LegacyTaskRuntimeEstimator.class,
128                           TaskRuntimeEstimator.class);
129 
130       Constructor<? extends TaskRuntimeEstimator> estimatorConstructor
131           = estimatorClass.getConstructor();
132 
133       estimator = estimatorConstructor.newInstance();
134 
135       estimator.contextualize(conf, context);
136     } catch (InstantiationException ex) {
137       LOG.error("Can't make a speculation runtime estimator", ex);
138       throw new YarnRuntimeException(ex);
139     } catch (IllegalAccessException ex) {
140       LOG.error("Can't make a speculation runtime estimator", ex);
141       throw new YarnRuntimeException(ex);
142     } catch (InvocationTargetException ex) {
143       LOG.error("Can't make a speculation runtime estimator", ex);
144       throw new YarnRuntimeException(ex);
145     } catch (NoSuchMethodException ex) {
146       LOG.error("Can't make a speculation runtime estimator", ex);
147       throw new YarnRuntimeException(ex);
148     }
149 
150   return estimator;
151   }
152 
153   // This constructor is designed to be called by other constructors.
154   //  However, it's public because we do use it in the test cases.
155   // Normally we figure out our own estimator.
DefaultSpeculator(Configuration conf, AppContext context, TaskRuntimeEstimator estimator, Clock clock)156   public DefaultSpeculator
157       (Configuration conf, AppContext context,
158        TaskRuntimeEstimator estimator, Clock clock) {
159     super(DefaultSpeculator.class.getName());
160 
161     this.conf = conf;
162     this.context = context;
163     this.estimator = estimator;
164     this.clock = clock;
165     this.eventHandler = context.getEventHandler();
166     this.soonestRetryAfterNoSpeculate =
167         conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
168                 MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE);
169     this.soonestRetryAfterSpeculate =
170         conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE,
171                 MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE);
172     this.proportionRunningTasksSpeculatable =
173         conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS,
174                 MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS);
175     this.proportionTotalTasksSpeculatable =
176         conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS,
177                 MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS);
178     this.minimumAllowedSpeculativeTasks =
179         conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS,
180                 MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS);
181   }
182 
183 /*   *************************************************************    */
184 
185   // This is the task-mongering that creates the two new threads -- one for
186   //  processing events from the event queue and one for periodically
187   //  looking for speculation opportunities
188 
189   @Override
serviceStart()190   protected void serviceStart() throws Exception {
191     Runnable speculationBackgroundCore
192         = new Runnable() {
193             @Override
194             public void run() {
195               while (!stopped && !Thread.currentThread().isInterrupted()) {
196                 long backgroundRunStartTime = clock.getTime();
197                 try {
198                   int speculations = computeSpeculations();
199                   long mininumRecomp
200                       = speculations > 0 ? soonestRetryAfterSpeculate
201                                          : soonestRetryAfterNoSpeculate;
202 
203                   long wait = Math.max(mininumRecomp,
204                         clock.getTime() - backgroundRunStartTime);
205 
206                   if (speculations > 0) {
207                     LOG.info("We launched " + speculations
208                         + " speculations.  Sleeping " + wait + " milliseconds.");
209                   }
210 
211                   Object pollResult
212                       = scanControl.poll(wait, TimeUnit.MILLISECONDS);
213                 } catch (InterruptedException e) {
214                   if (!stopped) {
215                     LOG.error("Background thread returning, interrupted", e);
216                   }
217                   return;
218                 }
219               }
220             }
221           };
222     speculationBackgroundThread = new Thread
223         (speculationBackgroundCore, "DefaultSpeculator background processing");
224     speculationBackgroundThread.start();
225 
226     super.serviceStart();
227   }
228 
229   @Override
serviceStop()230   protected void serviceStop()throws Exception {
231       stopped = true;
232     // this could be called before background thread is established
233     if (speculationBackgroundThread != null) {
234       speculationBackgroundThread.interrupt();
235     }
236     super.serviceStop();
237   }
238 
239   @Override
handleAttempt(TaskAttemptStatus status)240   public void handleAttempt(TaskAttemptStatus status) {
241     long timestamp = clock.getTime();
242     statusUpdate(status, timestamp);
243   }
244 
245   // This section is not part of the Speculator interface; it's used only for
246   //  testing
eventQueueEmpty()247   public boolean eventQueueEmpty() {
248     return eventQueue.isEmpty();
249   }
250 
251   // This interface is intended to be used only for test cases.
scanForSpeculations()252   public void scanForSpeculations() {
253     LOG.info("We got asked to run a debug speculation scan.");
254     // debug
255     System.out.println("We got asked to run a debug speculation scan.");
256     System.out.println("There are " + scanControl.size()
257         + " events stacked already.");
258     scanControl.add(new Object());
259     Thread.yield();
260   }
261 
262 
263 /*   *************************************************************    */
264 
265   // This section contains the code that gets run for a SpeculatorEvent
266 
containerNeed(TaskId taskID)267   private AtomicInteger containerNeed(TaskId taskID) {
268     JobId jobID = taskID.getJobId();
269     TaskType taskType = taskID.getTaskType();
270 
271     ConcurrentMap<JobId, AtomicInteger> relevantMap
272         = taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
273 
274     AtomicInteger result = relevantMap.get(jobID);
275 
276     if (result == null) {
277       relevantMap.putIfAbsent(jobID, new AtomicInteger(0));
278       result = relevantMap.get(jobID);
279     }
280 
281     return result;
282   }
283 
processSpeculatorEvent(SpeculatorEvent event)284   private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
285     switch (event.getType()) {
286       case ATTEMPT_STATUS_UPDATE:
287         statusUpdate(event.getReportedStatus(), event.getTimestamp());
288         break;
289 
290       case TASK_CONTAINER_NEED_UPDATE:
291       {
292         AtomicInteger need = containerNeed(event.getTaskID());
293         need.addAndGet(event.containersNeededChange());
294         break;
295       }
296 
297       case ATTEMPT_START:
298       {
299         LOG.info("ATTEMPT_START " + event.getTaskID());
300         estimator.enrollAttempt
301             (event.getReportedStatus(), event.getTimestamp());
302         break;
303       }
304 
305       case JOB_CREATE:
306       {
307         LOG.info("JOB_CREATE " + event.getJobID());
308         estimator.contextualize(getConfig(), context);
309         break;
310       }
311     }
312   }
313 
314   /**
315    * Absorbs one TaskAttemptStatus
316    *
317    * @param reportedStatus the status report that we got from a task attempt
318    *        that we want to fold into the speculation data for this job
319    * @param timestamp the time this status corresponds to.  This matters
320    *        because statuses contain progress.
321    */
statusUpdate(TaskAttemptStatus reportedStatus, long timestamp)322   protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
323 
324     String stateString = reportedStatus.taskState.toString();
325 
326     TaskAttemptId attemptID = reportedStatus.id;
327     TaskId taskID = attemptID.getTaskId();
328     Job job = context.getJob(taskID.getJobId());
329 
330     if (job == null) {
331       return;
332     }
333 
334     Task task = job.getTask(taskID);
335 
336     if (task == null) {
337       return;
338     }
339 
340     estimator.updateAttempt(reportedStatus, timestamp);
341 
342     if (stateString.equals(TaskAttemptState.RUNNING.name())) {
343       runningTasks.putIfAbsent(taskID, Boolean.TRUE);
344     } else {
345       runningTasks.remove(taskID, Boolean.TRUE);
346       if (!stateString.equals(TaskAttemptState.STARTING.name())) {
347         runningTaskAttemptStatistics.remove(attemptID);
348       }
349     }
350   }
351 
352 /*   *************************************************************    */
353 
354 // This is the code section that runs periodically and adds speculations for
355 //  those jobs that need them.
356 
357 
358   // This can return a few magic values for tasks that shouldn't speculate:
359   //  returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
360   //     considering speculating this task
361   //  returns ALREADY_SPECULATING if that is true.  This has priority.
362   //  returns TOO_NEW if our companion task hasn't gotten any information
363   //  returns PROGRESS_IS_GOOD if the task is sailing through
364   //  returns NOT_RUNNING if the task is not running
365   //
366   // All of these values are negative.  Any value that should be allowed to
367   //  speculate is 0 or positive.
speculationValue(TaskId taskID, long now)368   private long speculationValue(TaskId taskID, long now) {
369     Job job = context.getJob(taskID.getJobId());
370     Task task = job.getTask(taskID);
371     Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
372     long acceptableRuntime = Long.MIN_VALUE;
373     long result = Long.MIN_VALUE;
374 
375     if (!mayHaveSpeculated.contains(taskID)) {
376       acceptableRuntime = estimator.thresholdRuntime(taskID);
377       if (acceptableRuntime == Long.MAX_VALUE) {
378         return ON_SCHEDULE;
379       }
380     }
381 
382     TaskAttemptId runningTaskAttemptID = null;
383 
384     int numberRunningAttempts = 0;
385 
386     for (TaskAttempt taskAttempt : attempts.values()) {
387       if (taskAttempt.getState() == TaskAttemptState.RUNNING
388           || taskAttempt.getState() == TaskAttemptState.STARTING) {
389         if (++numberRunningAttempts > 1) {
390           return ALREADY_SPECULATING;
391         }
392         runningTaskAttemptID = taskAttempt.getID();
393 
394         long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
395 
396         long taskAttemptStartTime
397             = estimator.attemptEnrolledTime(runningTaskAttemptID);
398         if (taskAttemptStartTime > now) {
399           // This background process ran before we could process the task
400           //  attempt status change that chronicles the attempt start
401           return TOO_NEW;
402         }
403 
404         long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
405 
406         long estimatedReplacementEndTime
407             = now + estimator.estimatedNewAttemptRuntime(taskID);
408 
409         float progress = taskAttempt.getProgress();
410         TaskAttemptHistoryStatistics data =
411             runningTaskAttemptStatistics.get(runningTaskAttemptID);
412         if (data == null) {
413           runningTaskAttemptStatistics.put(runningTaskAttemptID,
414             new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
415         } else {
416           if (estimatedRunTime == data.getEstimatedRunTime()
417               && progress == data.getProgress()) {
418             // Previous stats are same as same stats
419             if (data.notHeartbeatedInAWhile(now)) {
420               // Stats have stagnated for a while, simulate heart-beat.
421               TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
422               taskAttemptStatus.id = runningTaskAttemptID;
423               taskAttemptStatus.progress = progress;
424               taskAttemptStatus.taskState = taskAttempt.getState();
425               // Now simulate the heart-beat
426               handleAttempt(taskAttemptStatus);
427             }
428           } else {
429             // Stats have changed - update our data structure
430             data.setEstimatedRunTime(estimatedRunTime);
431             data.setProgress(progress);
432             data.resetHeartBeatTime(now);
433           }
434         }
435 
436         if (estimatedEndTime < now) {
437           return PROGRESS_IS_GOOD;
438         }
439 
440         if (estimatedReplacementEndTime >= estimatedEndTime) {
441           return TOO_LATE_TO_SPECULATE;
442         }
443 
444         result = estimatedEndTime - estimatedReplacementEndTime;
445       }
446     }
447 
448     // If we are here, there's at most one task attempt.
449     if (numberRunningAttempts == 0) {
450       return NOT_RUNNING;
451     }
452 
453 
454 
455     if (acceptableRuntime == Long.MIN_VALUE) {
456       acceptableRuntime = estimator.thresholdRuntime(taskID);
457       if (acceptableRuntime == Long.MAX_VALUE) {
458         return ON_SCHEDULE;
459       }
460     }
461 
462     return result;
463   }
464 
465   //Add attempt to a given Task.
addSpeculativeAttempt(TaskId taskID)466   protected void addSpeculativeAttempt(TaskId taskID) {
467     LOG.info
468         ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
469     eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
470     mayHaveSpeculated.add(taskID);
471   }
472 
473   @Override
handle(SpeculatorEvent event)474   public void handle(SpeculatorEvent event) {
475     processSpeculatorEvent(event);
476   }
477 
478 
maybeScheduleAMapSpeculation()479   private int maybeScheduleAMapSpeculation() {
480     return maybeScheduleASpeculation(TaskType.MAP);
481   }
482 
maybeScheduleAReduceSpeculation()483   private int maybeScheduleAReduceSpeculation() {
484     return maybeScheduleASpeculation(TaskType.REDUCE);
485   }
486 
maybeScheduleASpeculation(TaskType type)487   private int maybeScheduleASpeculation(TaskType type) {
488     int successes = 0;
489 
490     long now = clock.getTime();
491 
492     ConcurrentMap<JobId, AtomicInteger> containerNeeds
493         = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;
494 
495     for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {
496       // This race conditon is okay.  If we skip a speculation attempt we
497       //  should have tried because the event that lowers the number of
498       //  containers needed to zero hasn't come through, it will next time.
499       // Also, if we miss the fact that the number of containers needed was
500       //  zero but increased due to a failure it's not too bad to launch one
501       //  container prematurely.
502       if (jobEntry.getValue().get() > 0) {
503         continue;
504       }
505 
506       int numberSpeculationsAlready = 0;
507       int numberRunningTasks = 0;
508 
509       // loop through the tasks of the kind
510       Job job = context.getJob(jobEntry.getKey());
511 
512       Map<TaskId, Task> tasks = job.getTasks(type);
513 
514       int numberAllowedSpeculativeTasks
515           = (int) Math.max(minimumAllowedSpeculativeTasks,
516               proportionTotalTasksSpeculatable * tasks.size());
517 
518       TaskId bestTaskID = null;
519       long bestSpeculationValue = -1L;
520 
521       // this loop is potentially pricey.
522       // TODO track the tasks that are potentially worth looking at
523       for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {
524         long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
525 
526         if (mySpeculationValue == ALREADY_SPECULATING) {
527           ++numberSpeculationsAlready;
528         }
529 
530         if (mySpeculationValue != NOT_RUNNING) {
531           ++numberRunningTasks;
532         }
533 
534         if (mySpeculationValue > bestSpeculationValue) {
535           bestTaskID = taskEntry.getKey();
536           bestSpeculationValue = mySpeculationValue;
537         }
538       }
539       numberAllowedSpeculativeTasks
540           = (int) Math.max(numberAllowedSpeculativeTasks,
541               proportionRunningTasksSpeculatable * numberRunningTasks);
542 
543       // If we found a speculation target, fire it off
544       if (bestTaskID != null
545           && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
546         addSpeculativeAttempt(bestTaskID);
547         ++successes;
548       }
549     }
550 
551     return successes;
552   }
553 
computeSpeculations()554   private int computeSpeculations() {
555     // We'll try to issue one map and one reduce speculation per job per run
556     return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
557   }
558 
559   static class TaskAttemptHistoryStatistics {
560 
561     private long estimatedRunTime;
562     private float progress;
563     private long lastHeartBeatTime;
564 
TaskAttemptHistoryStatistics(long estimatedRunTime, float progress, long nonProgressStartTime)565     public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress,
566         long nonProgressStartTime) {
567       this.estimatedRunTime = estimatedRunTime;
568       this.progress = progress;
569       resetHeartBeatTime(nonProgressStartTime);
570     }
571 
getEstimatedRunTime()572     public long getEstimatedRunTime() {
573       return this.estimatedRunTime;
574     }
575 
getProgress()576     public float getProgress() {
577       return this.progress;
578     }
579 
setEstimatedRunTime(long estimatedRunTime)580     public void setEstimatedRunTime(long estimatedRunTime) {
581       this.estimatedRunTime = estimatedRunTime;
582     }
583 
setProgress(float progress)584     public void setProgress(float progress) {
585       this.progress = progress;
586     }
587 
notHeartbeatedInAWhile(long now)588     public boolean notHeartbeatedInAWhile(long now) {
589       if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) {
590         return false;
591       } else {
592         resetHeartBeatTime(now);
593         return true;
594       }
595     }
596 
resetHeartBeatTime(long lastHeartBeatTime)597     public void resetHeartBeatTime(long lastHeartBeatTime) {
598       this.lastHeartBeatTime = lastHeartBeatTime;
599     }
600   }
601 
602   @VisibleForTesting
getSoonestRetryAfterNoSpeculate()603   public long getSoonestRetryAfterNoSpeculate() {
604     return soonestRetryAfterNoSpeculate;
605   }
606 
607   @VisibleForTesting
getSoonestRetryAfterSpeculate()608   public long getSoonestRetryAfterSpeculate() {
609     return soonestRetryAfterSpeculate;
610   }
611 
612   @VisibleForTesting
getProportionRunningTasksSpeculatable()613   public double getProportionRunningTasksSpeculatable() {
614     return proportionRunningTasksSpeculatable;
615   }
616 
617   @VisibleForTesting
getProportionTotalTasksSpeculatable()618   public double getProportionTotalTasksSpeculatable() {
619     return proportionTotalTasksSpeculatable;
620   }
621 
622   @VisibleForTesting
getMinimumAllowedSpeculativeTasks()623   public int getMinimumAllowedSpeculativeTasks() {
624     return minimumAllowedSpeculativeTasks;
625   }
626 }
627