1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.app.speculate; 20 21 import java.util.Map; 22 import java.util.concurrent.ConcurrentHashMap; 23 import java.util.concurrent.atomic.AtomicLong; 24 25 import org.apache.hadoop.mapreduce.v2.api.records.JobId; 26 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 27 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; 28 import org.apache.hadoop.mapreduce.v2.api.records.TaskId; 29 import org.apache.hadoop.mapreduce.v2.app.job.Job; 30 import org.apache.hadoop.mapreduce.v2.app.job.Task; 31 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; 32 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; 33 34 35 36 37 public class LegacyTaskRuntimeEstimator extends StartEndTimesBase { 38 39 private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates 40 = new ConcurrentHashMap<TaskAttempt, AtomicLong>(); 41 private final ConcurrentHashMap<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances 42 = new ConcurrentHashMap<TaskAttempt, AtomicLong>(); 43 44 @Override updateAttempt(TaskAttemptStatus status, long timestamp)45 public void updateAttempt(TaskAttemptStatus status, long timestamp) { 46 super.updateAttempt(status, timestamp); 47 48 49 TaskAttemptId attemptID = status.id; 50 TaskId taskID = attemptID.getTaskId(); 51 JobId jobID = taskID.getJobId(); 52 Job job = context.getJob(jobID); 53 54 if (job == null) { 55 return; 56 } 57 58 Task task = job.getTask(taskID); 59 60 if (task == null) { 61 return; 62 } 63 64 TaskAttempt taskAttempt = task.getAttempt(attemptID); 65 66 if (taskAttempt == null) { 67 return; 68 } 69 70 Long boxedStart = startTimes.get(attemptID); 71 long start = boxedStart == null ? Long.MIN_VALUE : boxedStart; 72 73 // We need to do two things. 74 // 1: If this is a completion, we accumulate statistics in the superclass 75 // 2: If this is not a completion, we learn more about it. 76 77 // This is not a completion, but we're cooking. 78 // 79 if (taskAttempt.getState() == TaskAttemptState.RUNNING) { 80 // See if this task is already in the registry 81 AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt); 82 AtomicLong estimateVarianceContainer 83 = attemptRuntimeEstimateVariances.get(taskAttempt); 84 85 if (estimateContainer == null) { 86 if (attemptRuntimeEstimates.get(taskAttempt) == null) { 87 attemptRuntimeEstimates.put(taskAttempt, new AtomicLong()); 88 89 estimateContainer = attemptRuntimeEstimates.get(taskAttempt); 90 } 91 } 92 93 if (estimateVarianceContainer == null) { 94 attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong()); 95 estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt); 96 } 97 98 99 long estimate = -1; 100 long varianceEstimate = -1; 101 102 // This code assumes that we'll never consider starting a third 103 // speculative task attempt if two are already running for this task 104 if (start > 0 && timestamp > start) { 105 estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress)); 106 varianceEstimate = (long) (estimate * status.progress / 10); 107 } 108 if (estimateContainer != null) { 109 estimateContainer.set(estimate); 110 } 111 if (estimateVarianceContainer != null) { 112 estimateVarianceContainer.set(varianceEstimate); 113 } 114 } 115 } 116 storedPerAttemptValue(Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID)117 private long storedPerAttemptValue 118 (Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID) { 119 TaskId taskID = attemptID.getTaskId(); 120 JobId jobID = taskID.getJobId(); 121 Job job = context.getJob(jobID); 122 123 Task task = job.getTask(taskID); 124 125 if (task == null) { 126 return -1L; 127 } 128 129 TaskAttempt taskAttempt = task.getAttempt(attemptID); 130 131 if (taskAttempt == null) { 132 return -1L; 133 } 134 135 AtomicLong estimate = data.get(taskAttempt); 136 137 return estimate == null ? -1L : estimate.get(); 138 139 } 140 141 @Override estimatedRuntime(TaskAttemptId attemptID)142 public long estimatedRuntime(TaskAttemptId attemptID) { 143 return storedPerAttemptValue(attemptRuntimeEstimates, attemptID); 144 } 145 146 @Override runtimeEstimateVariance(TaskAttemptId attemptID)147 public long runtimeEstimateVariance(TaskAttemptId attemptID) { 148 return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID); 149 } 150 } 151