1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.app.speculate; 20 21 import java.lang.reflect.Constructor; 22 import java.lang.reflect.InvocationTargetException; 23 import java.util.HashSet; 24 import java.util.Map; 25 import java.util.Set; 26 import java.util.concurrent.BlockingQueue; 27 import java.util.concurrent.ConcurrentHashMap; 28 import java.util.concurrent.ConcurrentMap; 29 import java.util.concurrent.LinkedBlockingQueue; 30 import java.util.concurrent.TimeUnit; 31 import java.util.concurrent.atomic.AtomicInteger; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.conf.Configuration; 36 import org.apache.hadoop.mapreduce.MRJobConfig; 37 import org.apache.hadoop.mapreduce.v2.api.records.JobId; 38 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 39 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; 40 import org.apache.hadoop.mapreduce.v2.api.records.TaskId; 41 import org.apache.hadoop.mapreduce.v2.api.records.TaskType; 42 import org.apache.hadoop.mapreduce.v2.app.AppContext; 43 import org.apache.hadoop.mapreduce.v2.app.job.Job; 44 import org.apache.hadoop.mapreduce.v2.app.job.Task; 45 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; 46 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; 47 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; 48 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; 49 import org.apache.hadoop.service.AbstractService; 50 import org.apache.hadoop.yarn.event.EventHandler; 51 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 52 import org.apache.hadoop.yarn.util.Clock; 53 54 import com.google.common.annotations.VisibleForTesting; 55 56 public class DefaultSpeculator extends AbstractService implements 57 Speculator { 58 59 private static final long ON_SCHEDULE = Long.MIN_VALUE; 60 private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1; 61 private static final long TOO_NEW = Long.MIN_VALUE + 2; 62 private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3; 63 private static final long NOT_RUNNING = Long.MIN_VALUE + 4; 64 private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5; 65 66 private long soonestRetryAfterNoSpeculate; 67 private long soonestRetryAfterSpeculate; 68 private double proportionRunningTasksSpeculatable; 69 private double proportionTotalTasksSpeculatable; 70 private int minimumAllowedSpeculativeTasks; 71 72 private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class); 73 74 private final ConcurrentMap<TaskId, Boolean> runningTasks 75 = new ConcurrentHashMap<TaskId, Boolean>(); 76 77 // Used to track any TaskAttempts that aren't heart-beating for a while, so 78 // that we can aggressively speculate instead of waiting for task-timeout. 79 private final ConcurrentMap<TaskAttemptId, TaskAttemptHistoryStatistics> 80 runningTaskAttemptStatistics = new ConcurrentHashMap<TaskAttemptId, 81 TaskAttemptHistoryStatistics>(); 82 // Regular heartbeat from tasks is every 3 secs. So if we don't get a 83 // heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change 84 // in progress. 85 private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000; 86 87 // These are the current needs, not the initial needs. For each job, these 88 // record the number of attempts that exist and that are actively 89 // waiting for a container [as opposed to running or finished] 90 private final ConcurrentMap<JobId, AtomicInteger> mapContainerNeeds 91 = new ConcurrentHashMap<JobId, AtomicInteger>(); 92 private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds 93 = new ConcurrentHashMap<JobId, AtomicInteger>(); 94 95 private final Set<TaskId> mayHaveSpeculated = new HashSet<TaskId>(); 96 97 private final Configuration conf; 98 private AppContext context; 99 private Thread speculationBackgroundThread = null; 100 private volatile boolean stopped = false; 101 private BlockingQueue<SpeculatorEvent> eventQueue 102 = new LinkedBlockingQueue<SpeculatorEvent>(); 103 private TaskRuntimeEstimator estimator; 104 105 private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>(); 106 107 private final Clock clock; 108 109 private final EventHandler<TaskEvent> eventHandler; 110 DefaultSpeculator(Configuration conf, AppContext context)111 public DefaultSpeculator(Configuration conf, AppContext context) { 112 this(conf, context, context.getClock()); 113 } 114 DefaultSpeculator(Configuration conf, AppContext context, Clock clock)115 public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) { 116 this(conf, context, getEstimator(conf, context), clock); 117 } 118 getEstimator(Configuration conf, AppContext context)119 static private TaskRuntimeEstimator getEstimator 120 (Configuration conf, AppContext context) { 121 TaskRuntimeEstimator estimator; 122 123 try { 124 // "yarn.mapreduce.job.task.runtime.estimator.class" 125 Class<? extends TaskRuntimeEstimator> estimatorClass 126 = conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR, 127 LegacyTaskRuntimeEstimator.class, 128 TaskRuntimeEstimator.class); 129 130 Constructor<? extends TaskRuntimeEstimator> estimatorConstructor 131 = estimatorClass.getConstructor(); 132 133 estimator = estimatorConstructor.newInstance(); 134 135 estimator.contextualize(conf, context); 136 } catch (InstantiationException ex) { 137 LOG.error("Can't make a speculation runtime estimator", ex); 138 throw new YarnRuntimeException(ex); 139 } catch (IllegalAccessException ex) { 140 LOG.error("Can't make a speculation runtime estimator", ex); 141 throw new YarnRuntimeException(ex); 142 } catch (InvocationTargetException ex) { 143 LOG.error("Can't make a speculation runtime estimator", ex); 144 throw new YarnRuntimeException(ex); 145 } catch (NoSuchMethodException ex) { 146 LOG.error("Can't make a speculation runtime estimator", ex); 147 throw new YarnRuntimeException(ex); 148 } 149 150 return estimator; 151 } 152 153 // This constructor is designed to be called by other constructors. 154 // However, it's public because we do use it in the test cases. 155 // Normally we figure out our own estimator. DefaultSpeculator(Configuration conf, AppContext context, TaskRuntimeEstimator estimator, Clock clock)156 public DefaultSpeculator 157 (Configuration conf, AppContext context, 158 TaskRuntimeEstimator estimator, Clock clock) { 159 super(DefaultSpeculator.class.getName()); 160 161 this.conf = conf; 162 this.context = context; 163 this.estimator = estimator; 164 this.clock = clock; 165 this.eventHandler = context.getEventHandler(); 166 this.soonestRetryAfterNoSpeculate = 167 conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, 168 MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE); 169 this.soonestRetryAfterSpeculate = 170 conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, 171 MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE); 172 this.proportionRunningTasksSpeculatable = 173 conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, 174 MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS); 175 this.proportionTotalTasksSpeculatable = 176 conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, 177 MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS); 178 this.minimumAllowedSpeculativeTasks = 179 conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, 180 MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS); 181 } 182 183 /* ************************************************************* */ 184 185 // This is the task-mongering that creates the two new threads -- one for 186 // processing events from the event queue and one for periodically 187 // looking for speculation opportunities 188 189 @Override serviceStart()190 protected void serviceStart() throws Exception { 191 Runnable speculationBackgroundCore 192 = new Runnable() { 193 @Override 194 public void run() { 195 while (!stopped && !Thread.currentThread().isInterrupted()) { 196 long backgroundRunStartTime = clock.getTime(); 197 try { 198 int speculations = computeSpeculations(); 199 long mininumRecomp 200 = speculations > 0 ? soonestRetryAfterSpeculate 201 : soonestRetryAfterNoSpeculate; 202 203 long wait = Math.max(mininumRecomp, 204 clock.getTime() - backgroundRunStartTime); 205 206 if (speculations > 0) { 207 LOG.info("We launched " + speculations 208 + " speculations. Sleeping " + wait + " milliseconds."); 209 } 210 211 Object pollResult 212 = scanControl.poll(wait, TimeUnit.MILLISECONDS); 213 } catch (InterruptedException e) { 214 if (!stopped) { 215 LOG.error("Background thread returning, interrupted", e); 216 } 217 return; 218 } 219 } 220 } 221 }; 222 speculationBackgroundThread = new Thread 223 (speculationBackgroundCore, "DefaultSpeculator background processing"); 224 speculationBackgroundThread.start(); 225 226 super.serviceStart(); 227 } 228 229 @Override serviceStop()230 protected void serviceStop()throws Exception { 231 stopped = true; 232 // this could be called before background thread is established 233 if (speculationBackgroundThread != null) { 234 speculationBackgroundThread.interrupt(); 235 } 236 super.serviceStop(); 237 } 238 239 @Override handleAttempt(TaskAttemptStatus status)240 public void handleAttempt(TaskAttemptStatus status) { 241 long timestamp = clock.getTime(); 242 statusUpdate(status, timestamp); 243 } 244 245 // This section is not part of the Speculator interface; it's used only for 246 // testing eventQueueEmpty()247 public boolean eventQueueEmpty() { 248 return eventQueue.isEmpty(); 249 } 250 251 // This interface is intended to be used only for test cases. scanForSpeculations()252 public void scanForSpeculations() { 253 LOG.info("We got asked to run a debug speculation scan."); 254 // debug 255 System.out.println("We got asked to run a debug speculation scan."); 256 System.out.println("There are " + scanControl.size() 257 + " events stacked already."); 258 scanControl.add(new Object()); 259 Thread.yield(); 260 } 261 262 263 /* ************************************************************* */ 264 265 // This section contains the code that gets run for a SpeculatorEvent 266 containerNeed(TaskId taskID)267 private AtomicInteger containerNeed(TaskId taskID) { 268 JobId jobID = taskID.getJobId(); 269 TaskType taskType = taskID.getTaskType(); 270 271 ConcurrentMap<JobId, AtomicInteger> relevantMap 272 = taskType == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds; 273 274 AtomicInteger result = relevantMap.get(jobID); 275 276 if (result == null) { 277 relevantMap.putIfAbsent(jobID, new AtomicInteger(0)); 278 result = relevantMap.get(jobID); 279 } 280 281 return result; 282 } 283 processSpeculatorEvent(SpeculatorEvent event)284 private synchronized void processSpeculatorEvent(SpeculatorEvent event) { 285 switch (event.getType()) { 286 case ATTEMPT_STATUS_UPDATE: 287 statusUpdate(event.getReportedStatus(), event.getTimestamp()); 288 break; 289 290 case TASK_CONTAINER_NEED_UPDATE: 291 { 292 AtomicInteger need = containerNeed(event.getTaskID()); 293 need.addAndGet(event.containersNeededChange()); 294 break; 295 } 296 297 case ATTEMPT_START: 298 { 299 LOG.info("ATTEMPT_START " + event.getTaskID()); 300 estimator.enrollAttempt 301 (event.getReportedStatus(), event.getTimestamp()); 302 break; 303 } 304 305 case JOB_CREATE: 306 { 307 LOG.info("JOB_CREATE " + event.getJobID()); 308 estimator.contextualize(getConfig(), context); 309 break; 310 } 311 } 312 } 313 314 /** 315 * Absorbs one TaskAttemptStatus 316 * 317 * @param reportedStatus the status report that we got from a task attempt 318 * that we want to fold into the speculation data for this job 319 * @param timestamp the time this status corresponds to. This matters 320 * because statuses contain progress. 321 */ statusUpdate(TaskAttemptStatus reportedStatus, long timestamp)322 protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) { 323 324 String stateString = reportedStatus.taskState.toString(); 325 326 TaskAttemptId attemptID = reportedStatus.id; 327 TaskId taskID = attemptID.getTaskId(); 328 Job job = context.getJob(taskID.getJobId()); 329 330 if (job == null) { 331 return; 332 } 333 334 Task task = job.getTask(taskID); 335 336 if (task == null) { 337 return; 338 } 339 340 estimator.updateAttempt(reportedStatus, timestamp); 341 342 if (stateString.equals(TaskAttemptState.RUNNING.name())) { 343 runningTasks.putIfAbsent(taskID, Boolean.TRUE); 344 } else { 345 runningTasks.remove(taskID, Boolean.TRUE); 346 if (!stateString.equals(TaskAttemptState.STARTING.name())) { 347 runningTaskAttemptStatistics.remove(attemptID); 348 } 349 } 350 } 351 352 /* ************************************************************* */ 353 354 // This is the code section that runs periodically and adds speculations for 355 // those jobs that need them. 356 357 358 // This can return a few magic values for tasks that shouldn't speculate: 359 // returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not 360 // considering speculating this task 361 // returns ALREADY_SPECULATING if that is true. This has priority. 362 // returns TOO_NEW if our companion task hasn't gotten any information 363 // returns PROGRESS_IS_GOOD if the task is sailing through 364 // returns NOT_RUNNING if the task is not running 365 // 366 // All of these values are negative. Any value that should be allowed to 367 // speculate is 0 or positive. speculationValue(TaskId taskID, long now)368 private long speculationValue(TaskId taskID, long now) { 369 Job job = context.getJob(taskID.getJobId()); 370 Task task = job.getTask(taskID); 371 Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts(); 372 long acceptableRuntime = Long.MIN_VALUE; 373 long result = Long.MIN_VALUE; 374 375 if (!mayHaveSpeculated.contains(taskID)) { 376 acceptableRuntime = estimator.thresholdRuntime(taskID); 377 if (acceptableRuntime == Long.MAX_VALUE) { 378 return ON_SCHEDULE; 379 } 380 } 381 382 TaskAttemptId runningTaskAttemptID = null; 383 384 int numberRunningAttempts = 0; 385 386 for (TaskAttempt taskAttempt : attempts.values()) { 387 if (taskAttempt.getState() == TaskAttemptState.RUNNING 388 || taskAttempt.getState() == TaskAttemptState.STARTING) { 389 if (++numberRunningAttempts > 1) { 390 return ALREADY_SPECULATING; 391 } 392 runningTaskAttemptID = taskAttempt.getID(); 393 394 long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID); 395 396 long taskAttemptStartTime 397 = estimator.attemptEnrolledTime(runningTaskAttemptID); 398 if (taskAttemptStartTime > now) { 399 // This background process ran before we could process the task 400 // attempt status change that chronicles the attempt start 401 return TOO_NEW; 402 } 403 404 long estimatedEndTime = estimatedRunTime + taskAttemptStartTime; 405 406 long estimatedReplacementEndTime 407 = now + estimator.estimatedNewAttemptRuntime(taskID); 408 409 float progress = taskAttempt.getProgress(); 410 TaskAttemptHistoryStatistics data = 411 runningTaskAttemptStatistics.get(runningTaskAttemptID); 412 if (data == null) { 413 runningTaskAttemptStatistics.put(runningTaskAttemptID, 414 new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now)); 415 } else { 416 if (estimatedRunTime == data.getEstimatedRunTime() 417 && progress == data.getProgress()) { 418 // Previous stats are same as same stats 419 if (data.notHeartbeatedInAWhile(now)) { 420 // Stats have stagnated for a while, simulate heart-beat. 421 TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); 422 taskAttemptStatus.id = runningTaskAttemptID; 423 taskAttemptStatus.progress = progress; 424 taskAttemptStatus.taskState = taskAttempt.getState(); 425 // Now simulate the heart-beat 426 handleAttempt(taskAttemptStatus); 427 } 428 } else { 429 // Stats have changed - update our data structure 430 data.setEstimatedRunTime(estimatedRunTime); 431 data.setProgress(progress); 432 data.resetHeartBeatTime(now); 433 } 434 } 435 436 if (estimatedEndTime < now) { 437 return PROGRESS_IS_GOOD; 438 } 439 440 if (estimatedReplacementEndTime >= estimatedEndTime) { 441 return TOO_LATE_TO_SPECULATE; 442 } 443 444 result = estimatedEndTime - estimatedReplacementEndTime; 445 } 446 } 447 448 // If we are here, there's at most one task attempt. 449 if (numberRunningAttempts == 0) { 450 return NOT_RUNNING; 451 } 452 453 454 455 if (acceptableRuntime == Long.MIN_VALUE) { 456 acceptableRuntime = estimator.thresholdRuntime(taskID); 457 if (acceptableRuntime == Long.MAX_VALUE) { 458 return ON_SCHEDULE; 459 } 460 } 461 462 return result; 463 } 464 465 //Add attempt to a given Task. addSpeculativeAttempt(TaskId taskID)466 protected void addSpeculativeAttempt(TaskId taskID) { 467 LOG.info 468 ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID); 469 eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT)); 470 mayHaveSpeculated.add(taskID); 471 } 472 473 @Override handle(SpeculatorEvent event)474 public void handle(SpeculatorEvent event) { 475 processSpeculatorEvent(event); 476 } 477 478 maybeScheduleAMapSpeculation()479 private int maybeScheduleAMapSpeculation() { 480 return maybeScheduleASpeculation(TaskType.MAP); 481 } 482 maybeScheduleAReduceSpeculation()483 private int maybeScheduleAReduceSpeculation() { 484 return maybeScheduleASpeculation(TaskType.REDUCE); 485 } 486 maybeScheduleASpeculation(TaskType type)487 private int maybeScheduleASpeculation(TaskType type) { 488 int successes = 0; 489 490 long now = clock.getTime(); 491 492 ConcurrentMap<JobId, AtomicInteger> containerNeeds 493 = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds; 494 495 for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) { 496 // This race conditon is okay. If we skip a speculation attempt we 497 // should have tried because the event that lowers the number of 498 // containers needed to zero hasn't come through, it will next time. 499 // Also, if we miss the fact that the number of containers needed was 500 // zero but increased due to a failure it's not too bad to launch one 501 // container prematurely. 502 if (jobEntry.getValue().get() > 0) { 503 continue; 504 } 505 506 int numberSpeculationsAlready = 0; 507 int numberRunningTasks = 0; 508 509 // loop through the tasks of the kind 510 Job job = context.getJob(jobEntry.getKey()); 511 512 Map<TaskId, Task> tasks = job.getTasks(type); 513 514 int numberAllowedSpeculativeTasks 515 = (int) Math.max(minimumAllowedSpeculativeTasks, 516 proportionTotalTasksSpeculatable * tasks.size()); 517 518 TaskId bestTaskID = null; 519 long bestSpeculationValue = -1L; 520 521 // this loop is potentially pricey. 522 // TODO track the tasks that are potentially worth looking at 523 for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) { 524 long mySpeculationValue = speculationValue(taskEntry.getKey(), now); 525 526 if (mySpeculationValue == ALREADY_SPECULATING) { 527 ++numberSpeculationsAlready; 528 } 529 530 if (mySpeculationValue != NOT_RUNNING) { 531 ++numberRunningTasks; 532 } 533 534 if (mySpeculationValue > bestSpeculationValue) { 535 bestTaskID = taskEntry.getKey(); 536 bestSpeculationValue = mySpeculationValue; 537 } 538 } 539 numberAllowedSpeculativeTasks 540 = (int) Math.max(numberAllowedSpeculativeTasks, 541 proportionRunningTasksSpeculatable * numberRunningTasks); 542 543 // If we found a speculation target, fire it off 544 if (bestTaskID != null 545 && numberAllowedSpeculativeTasks > numberSpeculationsAlready) { 546 addSpeculativeAttempt(bestTaskID); 547 ++successes; 548 } 549 } 550 551 return successes; 552 } 553 computeSpeculations()554 private int computeSpeculations() { 555 // We'll try to issue one map and one reduce speculation per job per run 556 return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation(); 557 } 558 559 static class TaskAttemptHistoryStatistics { 560 561 private long estimatedRunTime; 562 private float progress; 563 private long lastHeartBeatTime; 564 TaskAttemptHistoryStatistics(long estimatedRunTime, float progress, long nonProgressStartTime)565 public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress, 566 long nonProgressStartTime) { 567 this.estimatedRunTime = estimatedRunTime; 568 this.progress = progress; 569 resetHeartBeatTime(nonProgressStartTime); 570 } 571 getEstimatedRunTime()572 public long getEstimatedRunTime() { 573 return this.estimatedRunTime; 574 } 575 getProgress()576 public float getProgress() { 577 return this.progress; 578 } 579 setEstimatedRunTime(long estimatedRunTime)580 public void setEstimatedRunTime(long estimatedRunTime) { 581 this.estimatedRunTime = estimatedRunTime; 582 } 583 setProgress(float progress)584 public void setProgress(float progress) { 585 this.progress = progress; 586 } 587 notHeartbeatedInAWhile(long now)588 public boolean notHeartbeatedInAWhile(long now) { 589 if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) { 590 return false; 591 } else { 592 resetHeartBeatTime(now); 593 return true; 594 } 595 } 596 resetHeartBeatTime(long lastHeartBeatTime)597 public void resetHeartBeatTime(long lastHeartBeatTime) { 598 this.lastHeartBeatTime = lastHeartBeatTime; 599 } 600 } 601 602 @VisibleForTesting getSoonestRetryAfterNoSpeculate()603 public long getSoonestRetryAfterNoSpeculate() { 604 return soonestRetryAfterNoSpeculate; 605 } 606 607 @VisibleForTesting getSoonestRetryAfterSpeculate()608 public long getSoonestRetryAfterSpeculate() { 609 return soonestRetryAfterSpeculate; 610 } 611 612 @VisibleForTesting getProportionRunningTasksSpeculatable()613 public double getProportionRunningTasksSpeculatable() { 614 return proportionRunningTasksSpeculatable; 615 } 616 617 @VisibleForTesting getProportionTotalTasksSpeculatable()618 public double getProportionTotalTasksSpeculatable() { 619 return proportionTotalTasksSpeculatable; 620 } 621 622 @VisibleForTesting getMinimumAllowedSpeculativeTasks()623 public int getMinimumAllowedSpeculativeTasks() { 624 return minimumAllowedSpeculativeTasks; 625 } 626 } 627